This post is the sixth in our series on the motivations, architecture and performance gains of Apache Tez for data processing in Hadoop. The series has the following posts:
Tez follows the traditional Hadoop model of dividing a job into individual tasks, all of which are run as processes via YARN, on the users’ behalf – for isolation, among other reasons. This model comes with inherent costs – some of which are listed below.
Other than helping solve some of the existing concerns, re-using containers provide additional opportunities for optimization where data can be shared between tasks.
Each vertex in Tez specifies parameters, which are used when launching containers. These include the requested resources (memory, CPU etc), YARN LocalResources, the environment, and the command line options for tasks belonging to this Vertex. When a container is first launched, it is launched for a specific task and uses the parameters specified for the task (or vertex) – this then becomes the container’s signature. An already running container is considered to be compatible for another task when the running container’s signature is a superset of what the task requires.
Initially, when no containers are available, the Tez AM will request containers from the RM with location information specified, and rely on YARN’s scheduler for locality-aware assignments. However, for containers which are being considered for re-use, the scheduling smarts offered by YARN are no longer available.
The Tez scheduler works with several parameters to take decisions on task assignments – task-locality requirements, compatibility of containers as described above, total available resources on the cluster, and the priority of pending task requests.
When a task completes, and the container running the task becomes available for re-use – a task may not be assigned to it immediately – as tasks may not exist, for which the data is local to the container’s node. The Tez scheduler first makes an attempt to find a task for which the data would be local for the container. If no such task exists, the scheduler holds on to the container for a specific time, before actually allocating any pending tasks to this container. The expectation here, is that more tasks will complete – which gives additional opportunities for scheduling tasks on nodes which are close to the data. Going forward, non-local containers may be used in a speculative manner.
Priority of pending tasks (across different vertices), compatibility and cluster resources are considered to ensure that tasks which are deemed to be of higher priority (either due to a must-run-before relationship, failure, or due to specific scheduling policies) have an available container.
In the future, affinity will become part of the scheduling decision. This could be dictated by common resources shared between tasks, which need only be loaded by the first task running in a container, or by the data generated by the first task, which can then directly be processed by subsequent tasks, without needing to move/serialize the data – especially in the case of One-to-One edges.
At the moment, the number of tasks for a vertex, and their corresponding ‘work-units’ are determined up front. Going forward, this is likely to change to a model, where a certain number of tasks are setup up front based on cluster resources, but work-units for these tasks are determined at runtime. This allows additional optimizations where tasks which complete early are given additional work, and also allows for better locality-based assignment of work.
Each Tez JVM (or container) contains an object cache, which can be used to share data between different tasks running within the same container. This is a simple Key-Object store, with different levels of visibility/retention. Objects can be cached for use within tasks belonging to the same Vertex, for all tasks within a DAG, and for tasks running across a Tez Session (more on Sessions in a subsequent post). The resources being cached may, in the future, be made available as a hint to the Tez Scheduler for affinity based scheduling.
1) Hive makes use of this object registry to cache data for Broadcast Joins, which is fetched and computed once by the first task, and used directly by remaining tasks which run in the same JVM.
2) The sort buffer used by OnFileSortedOutput can be cached, and re-used across tasks.
Learn more about Apache Tez here.