At work, I was given a task of joining two different events coming in via a real time stream in Apache Beam. While joins are simple in a SQL batch job, they are significantly more challenging in real time streaming systems. In a batch job, the data is bounded - it is finite and eventually will be exhausted. In a real time streaming system, the data is potentially infinite - it must be broken up into a bounded set of records or your join will take an infinite amount of time. In addition, data comes in at different times - how long should you wait to decide the other side of the join is never coming? One solution to these problems is windowing. Windowing is breaking a real time event stream into bounded pieces. One example of windowing is a fixed window.
A fixed window is a window that captures all events in a fixed period of time, such as 2 minutes. The problem is that events near the boundary will not be joined, as they would be in different windows.
An alternative approach is use a sliding window, a window where events can overlap, as illustrated below. This has a different tradeoff, however: many of the events will be duplicates. But we will get all of the events, even if they cross a window boundary.
In any case, I found using either windowing method introduced too high latency to my join - you have to wait for the window to close, in addition for the time to use coGroupByKey() to join the objects together.
What I wanted was to cache the first object I saw, and wait for the matching object to come in, then do the join. I considered using Redis as an external cache, but then I discovered BagStateSpec in Apache Beam.
BagStateSpec spec allows you to store state for each key in a PCollection. First you map your records to the form (key, value), and pass it to a DoFn. Each time the same key comes up, Beam will pull up the BagStateSpec for that key, and you can add to the list with cache.add(), and read the list with cache.read(). You can clear the cache with cache.clear().
Hereโs the overall code with example data. It assumes that each record has a schema field showing if itโs the left or right side of the join.
import apache_beam as beam
from apache_beam.coders.coders import TupleCoder, PickleCoder, StrUtf8Coder
from apache_beam.transforms.userstate import BagStateSpec
class CachedJoin(beam.DoFn):
CACHE = BagStateSpec('cache', TupleCoder((StrUtf8Coder(), PickleCoder())))
def process(self, record, cache=beam.DoFn.StateParam(CACHE)):
key = record[0]
value = record[1]
schema_name = value["schema"]
other_schema = None
if schema_name == "left"
other_schema = "right"
else:
other_schema = "left"
other_record = [x for x in cache.read() if x["schema"] == other_schema]
if len(other_record) != 0:
other_record = other_record[-1]
cache.clear()
value.update(other_record)
value["key"] = key
del value["schema"]
yield value
else:
cache.add(value)
with beam.Pipeline() as pipeline:
icons = pipeline | 'Create icons' >> beam.Create([
('Apple', {"schema" : "left", "icon" : '๐'}),
('Grape', {"schema" : "left", "icon" : '๐'}),
('Tomato', {"schema" : "left", "icon" : '๐
'})
])
durations = pipeline | 'Create durations' >> beam.Create([
('Apple', {"schema" : "right", "schedule" : 'perennial'}),
('Grape', {"schema" : "right", "schedule" : 'perennial'}),
('Tomato', {"schema" : "right", "schedule" : 'annual'})
])
joined = ([icons, durations]
| "Flatten" >> beam.Flatten()
| "Join" >> beam.ParDo(CachedJoin)
)
This should output:
[
{"key" : "Apple", icon "๐", "schedule" : 'perennial'},
{"key" : "Grape", icon "๐", "schedule" : 'perennial'},
{"key" : "Tomato", icon "๐
", "schedule" : 'annual'}
]
With this, I was getting 100% of data with less than 100ms latency for my job, not counting time waiting for data to arrive - fast enough to output the data for my needs.
This works for an inner join, but thereโs two problems:
Left joins are trickier than inner joining. In an inner join, you simply wait until the other half of the join comes in, and do the join. However, with a left join you canโt be sure if you are ever going to get the other half - is it just late, or is it never coming? However, you can wait some time and then decide to give up expecting a record with a matching key.
You can use a timer to expire results when youโve decided youโre done waiting. You create a timer with TimerSpec in your class. You then annotate another method with @on_timer(TIMER), which will be called when the timer ends.
Hereโs an example of using a timer to accomplish a left join:
import apache_beam as beam
from apache_beam.coders.coders import TupleCoder, PickleCoder, StrUtf8Coder
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.userstate import on_timer, TimerSpec
from apache_beam.transforms.timeutil import TimeDomain
class CachedLeftJoin(beam.DoFn):
CACHE = BagStateSpec('cache', TupleCoder((StrUtf8Coder(), PickleCoder())))
STALE_TIMER = TimerSpec('stale', TimeDomain.REAL_TIME)
def process(self, record, cache=beam.DoFn.StateParam(CACHE), stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
key = record[0]
value = record[1]
schema_name = value["schema"]
other_schema = None
if schema_name == "left"
other_schema = "right"
else:
other_schema = "left"
other_record = [x for x in cache.read() if x["schema"] == other_schema]
if len(other_record) != 0:
other_record = other_record[-1]
cache.clear()
value.update(other_record)
value["key"] = key
del value["schema"]
yield value
else:
stale_timer.set(time.time() + 5) # Set the timer to 5 seconds past the current time
cache.add((key, value))
@on_timer(STALE_TIMER)
def expire(self, cache=beam.DoFn.StateParam(CACHE)):
right_dummy_record = {"schedule" : None}
key,record = cache.read()[-1]
cache.clear()
if record["schema"] == "left":
record = record.update(right_dummy)
del record["schema"]
yield record
with beam.Pipeline() as pipeline:
icons = pipeline | 'Create icons' >> beam.Create([
('Apple', {"schema" : "left", "icon" : '๐'}),
('Grape', {"schema" : "left", "icon" : '๐'}),
('Carrot', {"schema" : "left", "icon" : '๐ฅ'}),
('Tomato', {"schema" : "left", "icon" : '๐
'})
])
durations = pipeline | 'Create durations' >> beam.Create([
('Apple', {"schema" : "right", "schedule" : 'perennial'}),
('Grape', {"schema" : "right", "schedule" : 'perennial'}),
('Tomato', {"schema" : "right", "schedule" : 'annual'})
])
joined = ([icons, durations]
| "Flatten" >> beam.Flatten()
| "Join" >> beam.ParDo(CachedJoin)
)
This should output:
[
{"key" : "Apple", icon "๐", "schedule" : 'perennial'},
{"key" : "Grape", icon "๐", "schedule" : 'perennial'},
{"key" : "Tomato", icon "๐
", "schedule" : 'annual'}
{"key" : "Carrot", icon "๐ฅ", "schedule" : None}
]
Stateful Processing In Apache Beam/Cloud Dataflow
Cache reuse across DoFnโs in Beam