Runtime API in Apache Tez

This post is the third in our series on the motivations, architecture and performance gains of Apache Tez for data processing in Hadoop. The series has the following posts:

Apache Tez models data processing as a dataflow graph, with the vertices in the graph representing processing of data and edges representing movement of data between the processing. The user logic, that analyses and modifies the data, sits in the vertices. Edges determine the consumer of the data, how the data is transferred and the dependency between the producer and consumer vertices.

For users of MapReduce (MR), the most primitive functionality that Tez can provide is an ability to run a chain of Reduce stages as compared to a single Reduce stage in the current MR implementation. Via the Task API, Tez can do this and much more by facilitating execution of any form of processing logic that does not need to be retrofitted into a Map or Reduce task and also by supporting multiple options of data transfer between different vertices that are not restricted to the MapReduce shuffle transport mechanism.

The Building Blocks of Tez

The Task API provides the building blocks for a user to plug-in their logic to analyze and modify data into the vertex and augment this processing logic with the necessary plugins to transfer and route data between vertices.

Tez models the user logic running in each vertex as a composition of a set of Inputs, a Processor and a set of Outputs.

  • Input: An input represents a pipe through which a processor can accept input data from a data source such as HDFS or the output generated by another vertex.
  • Processor: The entity responsible for consuming one or more Inputs and producing one or more Outputs.
  • Output: An output represents a pipe through which a processor can generate output data for another vertex to consume or to a data sink such as HDFS.

Given that an edge in a DAG is a logical entity that represents a number of physical connections between the tasks of 2 connected vertices, to improve ease of programmability for a developer implementing a new Processor, there are 2 kinds of Inputs and Outputs to either expose or hide the level of complexity:

  • Logical: A corresponding pair of a LogicalInput and a LogicalOutput represent the  logical edge between 2 vertices. The implementation of Logical objects hides all the underlying physical connections and exposes a single view to the data.
  • Physical: The pair of Physical Input and Output represents the connection between a task of the Source vertex and a task of a Destination vertex.

An example of the Reduce stage within an MR job would be a Reduce Processor that receives data from the maps via ShuffleInput and generates output to HDFS. Likewise, an intermediate Reduce stage in an MRR chain would be quite similar to the final Reduce stage except for the difference in the Output type.

tez1

Tez Runtime API

tez3

To implement a new Input, Processor or Output, a user to implement the appropriate interfaces mentioned above. All objects are given a Context object in their initialize functions. This context is the hook for these objects to communicate to the Tez framework. The Inputs and Outputs are expected to provide implementations for their respective Readers and Writers which are then used by the Processor to read/write data. In a task, after the Tez framework has initialized all the necessary Inputs, Outputs and the Processor, the  Tez framework invokes the Processor’s run function and passes the appropriate handles to all the Inputs and Outputs for that particular task.

Tez allows all inputs and outputs to be pluggable. This requires support for passing of information from the Output of a source vertex to the Input of the destination vertex. For example, let us assume that the Output of a source vertex writes all of its data to a key-value store. The Output would need to communicate the “key” to the Input of the next stage so that the Input can retrieve the correct data from the key-value store. To facilitate this, Tez uses Events.

Events in Tez

Events in Tez are a way to pass information amongst different components.

  • The Tez framework uses Events to pass information of system events such as task failures to the required components.
  • Inputs of a vertex can inform the framework of any failures encountered when trying to retrieve data from the source vertex’s Output that in turn can be used by the framework to take failure recovery measures.
  • An Output can pass information of the location of the data, which it generates, to the Inputs of the destination vertex.  An example of this is described in the Shuffle Event diagram which shows how the output of a Map stage informs the Shuffle Input of the Reduce stage of the location of its output via a Data Movement Event.

tez2

Another use of Events is to enable run-time changes to the DAG execution plan. For example, based on the amount of the data being generated by a Map stage, it may be more optimal to run less reduce tasks within the following Reduce stage. Events generated by Outputs are routed to the pluggable Vertex/Edge management modules, allowing them to make the necessary decisions to modify some run-time parameters as needed.

Available implementations of Inputs/Processors/Outputs

 The flexibility of Tez allows anyone to implement their Inputs and Outputs, whether they use blocking/non-blocking transport protocols, handle data in the form of raw bytes/records/key-value pairs etc., and build Processors to handle these variety of Inputs and Outputs.

There is already a small repository of various implementations of Inputs/Outputs/Processors:

  • MRInput and MROutput: Basic input and outputs to handle data to/from HDFS that are MapReduce compatible as they use MapReduce constructs such as InputFormat, RecordReader, OutputFormat and RecordWriter.
  • OnFileSortedOutput and ShuffleMergedInput: A pair of key-value based Input and Output that use the local disk for all I/O and provide the same sort+merge functionality that is required for the “shuffle” edge between the Map and Reduce stages in a MapReduce job.
  • OnFileUnorderedKVOutput and ShuffledUnorderedKVInput: These are similar to the shuffle pair mentioned earlier except that the data is not sorted implicitly. This can be a big performance boost in various situations.
  • MapProcessor and ReduceProcessor: As the names suggest, these processors are available for anyone trying to run a MapReduce job on the Tez execution framework. They can be used to run an MRR chain too.

As the Hive and Pig projects adapt to use Tez, we hope this repository will grow to house a common set of building blocks for use across the different projects.

Categorized by :
Architect & CIO Developer Hadoop 2.0 HDP 2 Performance Tez

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Stinger Initiative

The Stinger Initiative is a broad, community-based effort to drive the future of Apache Hive, delivering 100x performance improvements at petabyte scale with familiar SQL semantics. More »

Recently in the Blog

Modern Data Architecture
Tackle the challenges of big data. Hadoop integrates with existing EDW, RDBMS and MPP systems to deliver lower cost, higher capacity infrastructure.
Hortonworks Data Platform
The Hortonworks Data Platform is a 100% open source distribution of Apache Hadoop that is truly enterprise grade having been built, tested and hardened with enterprise rigor.
Integrate with existing systems
Hortonworks maintains and works with an extensive partner ecosystem from broad enterprise platform vendors to specialized solutions and systems integrators.

Thank you for subscribing!