I am presuming you know the what’s and why’s of Apache Flink, touted as a one of the best data processing framework that can do both batch and streaming processing. Recently Flink announced a cool new CEP Library. Just hang on with me, before going any further let me just say the reason for this post, some time back a lot of them were convinced that stream and micro-batching frameworks like Storm, Trident, Spark Streaming and even Flink (before CEP Library release) are good enough to write a full-blown CEP. I respectfully disagreed because of the complexities of the use cases I was aware of in some IoT installs. Even to track a simple event pattern, one will have to write a lot of code and it will be a mess to maintain states and so on. Of course we could use Elastic search percolator or stored query engines like Luwak for simpler patterns stored as queries. But just think about it, dedicated tools like ESPER and other expensive commercial tools in this space wont even exist if we can do this using stream processing frameworks. I am guessing CEP tools may have their own Gartner magic quadrant. This API release pretty much validated my argument and you will get me when I get to the use cases.
So, what is CEP (if you already didn’t know) ?
An, ability to write continuous queries to monitor and identify a pre-defined event pattern from a stream of unbounded events is called Complex Event Processing or CEP. Thats all, but are data processing frameworks designed to this out of the box without any extra features ? Thats the whole argument of this post.
Ok, What problems does CEP solve ?
Lets think about a Bank that generates tons of events into the Banking system for processing. Say,the bank wants to watch or monitor for a event pattern:
“For a customerId X, ATM Withdrawal Txns >= 10,000 $ made more than ‘3 times’ in a location > 50 mile radius from the next txn within ‘2 hours’ must be flagged as possible fraudulent transaction and an alert should be triggered “.
Let’s call this event pattern as ‘ATM Fraud monitor’ . Likewise a typical bank can have multiple monitors to monitor different event patterns.
Now lets look at the a more concrete example of the use case.
ATM withdrawal pattern for customer X
Lets see our event pattern again,“An ATM Withdrawal Txn >= 10K made more than ‘3 times’ in a location > 50 mile radius from the next txn within ‘2 hours'”. As you can see among the ATM txns of the customers, highlighted are customer X’s txn in yellow. Now lets see what the ATM Fraud monitor is supposed to do on seeing this stream:
- Recognise the start or begin of a event pattern, save the counter state to 1 as we need 3 counts for any actions and set a countdown clock. (In this case when the first event at 10:00:45 came in with Txn > 10K if the location L1 is within 50 mile radius, of course you need a geo-spatial query here). A clock countdown of 2 hours is required in this case, so since the ‘begin’ up until 2 hours no other Txn > 10K came into the system for customer X, the alarm event can be handled to safely discard all the states.
But if within 2 hours Txns satisfying all filters for customer X comes in the counts will be incremented. When and If the count hits 3, the action logic should be invoked, it could be as simple as sending an alert. Thats exactly what Apache Flink CEP Pattern API offers.
Thats a wrap,
Here are some one snippets from Flink page to get a feel, even though it is written for temperature events you will get the idea.
Imagine writing everything from the scratch using Spark or Storm, if someone wants to I salute them. But promise me you will make it so composable with fluent APIs and readable. But the point is, what if it is all taken care of and all you have to do model your event and code your CEP with all your Threshold values ? Thats Apache Flink’s CEP promise, but this is only a high level opinion. Let me try out the APIs to model the same use case I presented in this post, see how flexible these really are and report back to you.
Stay tuned !
As promised here is the followup on the CEP library with working code implementing this use case.