Update 4, Nov 2016: When I first wrote this post it was outright mockery and contempt. But the Google Data flow paper (The Unified google framework for Batch (FlumeJava) and Stream processing (MillWheel)) and the Google MillWheel paper clearly explains that this is exactly the same approach google team has taken to solve the duplicate events problem. While the approach is same they are using a Sketching / Probablistic data structure, a Bloom filter to optimise the memory required for storing the finger prints and I have proposed to use a deterministic data structure, a Distributed Hashmap. I would still argue a Bloom filter wouldn’t make a good choice for this problem even though cheaper. I will cover that in a different post. (Here is a good Data flow review and Millwheel review). I feel validated !
Original Post: It is a common misconception that an exactly-once messaging guarantee is achieved ONLY at Kafka consumer (or at message broker consumer) level, it should be achieved at a system level considering multiple scenarios.
Because for instance, there is always a possibility of duplicate events or late events from the event source but most stream / micro-batch processing systems don’t factor that into the design. I am trying to clear the air and also show how this can be done and how I have done this. Doing this at the producer part is much harder. Ideally speaking the streaming frameworks should provide an option to create an idempotent producer.Anyways here is my attempt. In all fairness it may not be very straightforward. Also there is no a one-size-fits-all solution, but you can always borrow the general idea. Using the general idea you can tailor a custom solution to any stream or micro-batch processing solutions your are building where duplicates cannot be tolerated.
Here is the use case with Kafka-Storm, very briefly
We had sensors out in the field sensing location and some miscellaneous events. The sensor events are fed into an IoT platform; Imagine ThingWorx (Axeda formerly) orAWS IoT (formerly 2lemetry) here. From the IoT platform, events are pushed into our application. The first point of contact of our application is an API, the “push” happens through an API call. Although the IoT platform could push events through multiple ways we chose to use an API call here. The API then does some very basic validations and writes events to a partitioned Kafka topic. Our storm processors reads, enriches and persists the data.
Accomplishing exactly-once semantic
IMHO, Two ways to accomplish exactly-once semantic are
- Early detection of a possible duplicate and preventing it entering the system is key (or)
- You can make your processing logic immune to duplicates i.e. idempotent.
Intuitively, option 1 will make a lot of sense. Without divulging too much information here is what the architecture looked like. We changed the Events API to generate and piggyback an event “fingerprint” along with event packet. Then,Lookup an external data store If the fingerprint already exists, if so reject the event and log it (This means system has already received the event and would be processing it eventually and the current event is a possible duplicate), on the other hand if the fingerprint doesn’t exist in the external data store, store with a status “new”. Then feed the event to Kafka. Down the line Storm processors shall consume and apply some business logic and once the event is “processed” update the external data store fingerprint status as “processed”. (Irrespective of whether you “acked” or “failed” the tuple at storm level, because until the event aka tuple is processed successfully you can refuse to “ack” it so it will be replayed by the spout or if you choose to “fail” the tuple at storm level for business reasons it doesn’t matter. By “processed” you are telling the “exactly-once enforcer” that storm is done with the event). The major disadvantage of this solution, I have added another moving part to the solution the K,V store and also it has to store all the events signatures. So storage-wise it doesn’t make a lot of sense but for a stronger duplicate check these are required.
Some considerations,
- Choice of your external DS to store the fingerprints is critical as expensive disk IO can slow things down. You might want to have an in-memory <K,V> store (or even a document store would work with an in-memory storage engine depending on your use case).
- As for as fingerprinting, in our packets there were event Id, sensor Id and time stamps with ‘ms’ precision and the combination will be a unique so you can simply append it to create a fingerprint or run the appended string through a one-way hash like MD5 or SHA-1 algorithm or some non-crypto hash algorithms like murmur / fnv to get a string, up to you. Be warned that cryptographic hashes can be hamper your performance. But be warned if your sensors can send the exact same events with different time stamps with ‘ms’ precision, then you might have to leave the time stamp out and rely on the unique nuances of the event data to create a fingerprint.
- Of course for Kafka level playback / offset based rewinding we need some considerations which I am intentionally brushing away.
Takeaways,
Never feed into Kafka directly from any end point, let the Kafka producers call a common API to feed into Kafka. Do a similar fingerprinting exercise as a part of the API before feeding into Kafka and take it forward from there.