Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.

cta

Get Started

cloud

Ready to Get Started?

Download sandbox

How can we help you?

closeClose button
August 26, 2014
prev slideNext slide

Apache Storm Design Pattern—Micro Batching

Sheetal Dolas is a Principal Architect at Hortonworks. As part of Apache Storm design patterns’ series blog, he explores three options for micro-batching using Apache Storm’s core APIs. This is the first blog in the series.

What is Micro-batching?

Micro-batching is a technique that allows a process or task to treat a stream as a sequence of small batches or chunks of data. For incoming streams, the events can be packaged into small batches and delivered to a batch system for processing [1]

Micro-batching in Apache Storm

In Apache Storm, micro-batching in core Storm topologies makes sense for performance or for integration with external systems (like ElasticSearch, Solr, HBase or a database).

Implementing reliable, latency sensitive, and effective batching in Storm isn’t trivial—but you can do it. Even though Storm’s Trident APIs do provide internal batching capabilities, this blog focuses on micro-batching using Storm’s core APIs.

These are some of the challenges implementing micro-batching in Storm:

  • Data delivery unreliability: Storm’s ack’ing mechanism plays an important role in reliable data processing and delivery. Ack’ing stream tuples too early may result in data loss if external system syncs fail before completion. For example, if tuples are ack’ed as soon as they are received in a bolt but before they are successfully persisted in external store in batches, there is a potential that the persistence may fail. However, from Storm’s perspective, the tuple is successfully ack’ed and, hence, would not be replayed. In such situations, the data may never reach its destination and will be lost.
  • Unnecessary data duplication: One may try to solve the previous issue by ack’ing the tuples only after successful persistence, within their timeout period. In such situations, Storm will replay those tuples resulting in unnecessary reprocessing and data duplication in the target system.
  • Increased latency: Streams can have peaks and troughs. During a lull, the stream may not contain enough data to fulfill the batch size in a reasonable time period. Batches that are too small can decrease the performance, but batches that are too large can slow a tuple in reaching its destination. For example, consider a bolt configured to process 100 tuples per batch during off peak periods. If the first tuple reaches the bolt at 2:00 PM and the 100th comes at 2:30 PM, the batch would get flushed at 2:30 PM. This would cause a 30-minute delay before the first tuple reached its destination (defeating the purpose of real-time streaming).
  • Complexity in time-bound batching: In order to avoid the aforementioned latency and replay issues, the batches should be bound by both tuple size and time. But since the user code in Storm bolts is trigged only on tuple availability, it is hard to inject code that can get auto-triggered on certain time intervals.

Let’s examine what options are available for time-bound batching and discuss their pros and cons.

1. Thread-based Model

Implementation details: Create a thread in each bolt that will process your batch. Between predefined sleep intervals, each bolt thread will flush the batch.

Pros:

  • Developers have control over spinning their own threads and scheduling time intervals to flush the batches.
  • Each bolt can have a different time interval for flushing.

Cons:

  • Storm already has an internal thread model: bolts and spouts run in the threads. By creating your own threads, you introduce complexity and invite concurrency issues and the possibility of deadlock. Testing and debugging multi-threaded and distributed code is hard.
  • This model would have to be repeated for every different bolt.

2. Fabric Stream To Trigger Batch Flush

Implementation details: Implement a fabricated parallel data ingestion stream. This will allow you to feed data in it at certain time intervals and use arrival of a tuple from this stream as a signal to flush the batch. See the illustration below.
sheetal_storm

Pros:

  • Since the application manages the stream and is a core part of the design, we can be confident that data will arrive at predictable times and rates to trigger the batch flush. Application peaks and troughs do not impact the flush times.
  • We can circumvent all of the threading and its related complications.

Cons:

  • It is difficult to guarantee that the tuple from the stream reaches every bolt in a topology, especially if the topology has a long and/or complex DAG.
  • Requiring additional development for generating a fabricated stream and ensuring that tuples from it reach every bolt in topology adds complexity. As in the threaded model above, this implementation requires repetition for every topology.
  • Needing a fault-tolerance and error-handling plan for a fabricated stream adds code complexity and manageability challenges. What if this fabricated stream stops sending data? What if the host generating this fabric stream fails? This quickly becomes cumbersome and unmanageable.

Use Tick Tuples

Implementation details: The Tick Tuples scheme is Storm’s built-in mechanism for generating tuples and sending them to each bolt in the topology at specified intervals.

According to Nathan Marz:

It’s common to require a bolt to ‘do something’ at a fixed interval, like flush writes to a database. Many people have been using variants of a ClockSpout to send these ticks. The problem with a ClockSpout is that you can’t internalize the need for ticks within your bolt, so if you forget to set up your bolt correctly within your topology, it won’t work correctly. [Storm] 0.8.0 introduces a new ‘tick tuple’ config that lets you specify the frequency at which you want to receive tick tuples via the ‘topology.tick.tuple.freq.secs’ component-specific config, and then your bolt will receive a tuple from the __system component and __tick stream at that frequency.

Pros:

  • Does not require custom code.
  • You don’t have to worry about failures, fault tolerance, threads, or tuple distribution. It is all handled by Storm.

Cons:

  • Tick tuples are topology specific—not bolt specific. As such, you cannot use them for managing different batch schedules for different components within topologies.

Conclusion

Of all the models suggested above, the tick tuples is the best way to solve micro-batching using Storm’s core API. For reliable, latency aware, and effective micro-batching using Storm’s built-in tick tuples scheme, see the following sample implementation.


/** The queue holding tuples in a batch. */
protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>();

/** The threshold after which the batch should be flushed out. */
int batchSize = 100;

/**
* The batch interval in sec. Minimum time between flushes if the batch sizes
* are not met. This should typically be equal to
* topology.tick.tuple.freq.secs and half of topology.message.timeout.secs
*/

int batchIntervalInSec = 45;
/** The last batch process time seconds. Used for tracking purpose */

long lastBatchProcessTimeSeconds = 0;

@Override
public void execute(Tuple tuple) {
// Check if the tuple is of type Tick Tuple
if (TupleHelpers.isTickTuple(tuple)) {

// If so, it is indication for batch flush. But don't flush if previous
// flush was done very recently (either due to batch size threshold was
// crossed or because of another tick tuple
//
if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
LOG.debug("Current queue size is " + this.queue.size()
+ ". But received tick tuple so executing the batch");
finishBatch();
} else {
LOG.debug("Current queue size is " + this.queue.size()
+ ". Received tick tuple but last batch was executed "
+ (System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds)
+ " seconds back that is less than " + batchIntervalInSec
+ " so ignoring the tick tuple");
}
} else {

// Add the tuple to queue. But don't ack it yet.
this.queue.add(tuple);
int queueSize = this.queue.size();

LOG.debug("current queue size is " + queueSize);
if (queueSize >= batchSize) {
LOG.debug("Current queue size is >= " + batchSize
+ " executing the batch");
finishBatch();
}
}
}

/**
* Finish batch.
*/
public void finishBatch() {
LOG.debug("Finishing batch of size " + queue.size());
lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
List<Tuple> tuples = new ArrayList<Tuple>();
queue.drainTo(tuples);
BulkRequestBuilder bulkRequest = client.prepareBulk();
BulkResponse bulkResponse = null;
for (Tuple tuple : tuples) {
// Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or
// anything else.
}
try {
// Execute bulk request and get individual tuple responses back.
bulkResponse = bulkRequest.execute().actionGet();
BulkItemResponse[] responses = bulkResponse.getItems();
BulkItemResponse response = null;
LOG.debug("Executed the batch. Processing responses.");
for (int counter = 0; counter < responses.length; counter++) {
response = responses[counter];
if (response.isFailed()) {
ElasticSearchDocument failedEsDocument = this.tupleMapper
.mapToDocument(tuples.get(counter));
LOG.error("Failed to process tuple # " + counter);
this.collector.fail(tuples.get(counter));
} else {
LOG.debug("Successfully processed tuple # " + counter);
this.collector.ack(tuples.get(counter));
}
}
} catch (Exception e) {
LOG.error("Unable to process " + tuples.size() + " tuples", e);
// Fail entire batch
for (Tuple tuple : tuples) {
this.collector.fail(tuple);
}
}
}

Discover and Learn More

  • Read about Apache Storm
  • Try Apache Storm Tutorials
  • Read about the Future of Apache Storm

References

1. http://arxiv.org/pdf/1403.3375.pdf

Tags:

Comments

  • So the idea is if the bolt or worker failed and get restarted, all data from the arrayList (line 59) get lost and therefore failed on timeout by storm and replayed. This allow you not to persist batch data into permanent storage. Is that right?
    Thanks

    • Hi Eugene,

      This pattern takes all advantages of Storm’s fault tolerance. So in case bolts fails/worker dies/host does down etc, tuples that are not yet acked will get replayed per Storm’s design.

      You would want to avoid adding intermediate persistent as much as possible for multiple reasons. Try to keep topology as lean and nimble as possible by taking advantage of Storm’s tuple processing guarantees.

      ~ Sheetal

  • Hi Sheetal,

    This is a very useful article. I have a few questions related to the article and few general ones. Apologies if the seem too trivial. I’m just getting my feet wet with Storm.

    1. Can’t the unnecessary data duplication (case 2 in challenges implementing micro-batching) be avoiding by specifying bolt-specific timeouts? (higher timeout for bolts that batch the tuples)
    2. In the fabricated tuple approach, shouldn’t the ‘fabricated data spout’ be consumed by all the bolts in the topology. The diagram shows only the ‘business function’ consuming the spout?

    On a general note.
    3. Are there plans to introduce batching as part of the core Storm API? This enables the user to define the batch size and batch timeout while defining the spouts and bolts.
    4. Does the trident APiI allow users to tweak the batch size/batch timeout?

    Thanks again for the wonderful article.

    Anirudh

  • Leave a Reply

    Your email address will not be published. Required fields are marked *

    If you have specific technical questions, please post them in the Forums

    You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>