Runtime API in Apache Tez
This post is the third in our series on the motivations, architecture and performance gains of Apache Tez for data processing in Hadoop. The series has the following posts:
- Apache Tez: A New Chapter in Hadoop Data Processing
- Data Processing API in Apache Tez
- Runtime API in Apache Tez
- Writing a Tez Input/Processor/Output
- Apache Tez: Dynamic Graph Reconfiguration
- Reusing containers in Apache Tez
- Introducing Tez Sessions
Apache Tez models data processing as a dataflow graph, with the vertices in the graph representing processing of data and edges representing movement of data between the processing. The user logic, that analyses and modifies the data, sits in the vertices. Edges determine the consumer of the data, how the data is transferred and the dependency between the producer and consumer vertices.
For users of MapReduce (MR), the most primitive functionality that Tez can provide is an ability to run a chain of Reduce stages as compared to a single Reduce stage in the current MR implementation. Via the Task API, Tez can do this and much more by facilitating execution of any form of processing logic that does not need to be retrofitted into a Map or Reduce task and also by supporting multiple options of data transfer between different vertices that are not restricted to the MapReduce shuffle transport mechanism.
The Building Blocks of Tez
The Task API provides the building blocks for a user to plug-in their logic to analyze and modify data into the vertex and augment this processing logic with the necessary plugins to transfer and route data between vertices.
Tez models the user logic running in each vertex as a composition of a set of Inputs, a Processor and a set of Outputs.
- Input: An input represents a pipe through which a processor can accept input data from a data source such as HDFS or the output generated by another vertex.
- Processor: The entity responsible for consuming one or more Inputs and producing one or more Outputs.
- Output: An output represents a pipe through which a processor can generate output data for another vertex to consume or to a data sink such as HDFS.
Given that an edge in a DAG is a logical entity that represents a number of physical connections between the tasks of 2 connected vertices, to improve ease of programmability for a developer implementing a new Processor, there are 2 kinds of Inputs and Outputs to either expose or hide the level of complexity:
- Logical: A corresponding pair of a LogicalInput and a LogicalOutput represent the logical edge between 2 vertices. The implementation of Logical objects hides all the underlying physical connections and exposes a single view to the data.
- Physical: The pair of Physical Input and Output represents the connection between a task of the Source vertex and a task of a Destination vertex.
An example of the Reduce stage within an MR job would be a Reduce Processor that receives data from the maps via ShuffleInput and generates output to HDFS. Likewise, an intermediate Reduce stage in an MRR chain would be quite similar to the final Reduce stage except for the difference in the Output type.
Tez Runtime API
To implement a new Input, Processor or Output, a user to implement the appropriate interfaces mentioned above. All objects are given a Context object in their initialize functions. This context is the hook for these objects to communicate to the Tez framework. The Inputs and Outputs are expected to provide implementations for their respective Readers and Writers which are then used by the Processor to read/write data. In a task, after the Tez framework has initialized all the necessary Inputs, Outputs and the Processor, the Tez framework invokes the Processor’s run function and passes the appropriate handles to all the Inputs and Outputs for that particular task.
Tez allows all inputs and outputs to be pluggable. This requires support for passing of information from the Output of a source vertex to the Input of the destination vertex. For example, let us assume that the Output of a source vertex writes all of its data to a key-value store. The Output would need to communicate the “key” to the Input of the next stage so that the Input can retrieve the correct data from the key-value store. To facilitate this, Tez uses Events.
Events in Tez
Events in Tez are a way to pass information amongst different components.
- The Tez framework uses Events to pass information of system events such as task failures to the required components.
- Inputs of a vertex can inform the framework of any failures encountered when trying to retrieve data from the source vertex’s Output that in turn can be used by the framework to take failure recovery measures.
- An Output can pass information of the location of the data, which it generates, to the Inputs of the destination vertex. An example of this is described in the Shuffle Event diagram which shows how the output of a Map stage informs the Shuffle Input of the Reduce stage of the location of its output via a Data Movement Event.
Another use of Events is to enable run-time changes to the DAG execution plan. For example, based on the amount of the data being generated by a Map stage, it may be more optimal to run less reduce tasks within the following Reduce stage. Events generated by Outputs are routed to the pluggable Vertex/Edge management modules, allowing them to make the necessary decisions to modify some run-time parameters as needed.
Available implementations of Inputs/Processors/Outputs
The flexibility of Tez allows anyone to implement their Inputs and Outputs, whether they use blocking/non-blocking transport protocols, handle data in the form of raw bytes/records/key-value pairs etc., and build Processors to handle these variety of Inputs and Outputs.
There is already a small repository of various implementations of Inputs/Outputs/Processors:
MROutput: Basic input and outputs to handle data to/from HDFS that are MapReduce compatible as they use MapReduce constructs such as InputFormat, RecordReader, OutputFormat and RecordWriter.
ShuffleMergedInput: A pair of key-value based Input and Output that use the local disk for all I/O and provide the same sort+merge functionality that is required for the “shuffle” edge between the Map and Reduce stages in a MapReduce job.
ShuffledUnorderedKVInput: These are similar to the shuffle pair mentioned earlier except that the data is not sorted implicitly. This can be a big performance boost in various situations.
ReduceProcessor: As the names suggest, these processors are available for anyone trying to run a MapReduce job on the Tez execution framework. They can be used to run an MRR chain too.