Collecting and Windowing Example#

In this section, we’ll be talking about the collect and window operators that Bytewax provides.

Collecting#

The collect operator operates over a stream of data and collects a number of items until a max size, or a given timeout is reached.

Let’s construct a simple dataflow to demonstrate. In the following dataflow, we’re using the TestingSource to generate sample data from a list. In a production dataflow, your input could come from Redpanda, or any other input source.

The TestingSource emits one integer at a time into the dataflow. In our collect operator, we’ll configure it to wait for either 3 values, or for 10 seconds to expire before emitting the list downstream. Since we won’t be waiting for data with our TestingSource, we should see list of 3 items until we run out of input from our TestingSource.

It is important to remember that the collect operator collects data based on a key. We’ll use the key_on operator to give all of our items the same fixed key so they are processed together.

Copy the following code into a file named collect_example.py:

from datetime import timedelta

import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutSink
from bytewax.testing import TestingSource

flow = Dataflow("collect")
stream = op.input("input", flow, TestingSource(list(range(10))))
# We want to consider all the items together, so we assign the same fixed key to each of them.
keyed_stream = op.key_on("key", stream, lambda _x: "ALL")
collected_stream = op.collect(
    "collect", keyed_stream, timeout=timedelta(seconds=10), max_size=3
)
op.output("out", collected_stream, StdOutSink())

Now we have our dataflow, we can run our example:

$ python -m bytewax.run collect_example

You should see output:

('ALL', [0, 1, 2])
('ALL', [3, 4, 5])
('ALL', [6, 7, 8])
('ALL', [9])

Windowing#

Windowing operators (which live in bytewax.operators.windowing) perform computation over a time-based window of data where time can be defined as the system time that the data is processed, known as processing time, or time as a property of the data itself referred to as event time. For this example, we’re going to use event time. Time will be used in our window operators to decide which windows a given item belongs to, and to determine when an item is late.

Let’s start by and importing the relevant classes, creating a dataflow, and configuring some test input.

from datetime import datetime, timedelta, timezone

import bytewax.operators as op
import bytewax.operators.windowing as win

from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutSink
from bytewax.operators.windowing import EventClock, TumblingWindower, WindowMetadata
from bytewax.testing import TestingSource

flow = Dataflow("windowing")

align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
inp = [
    {"time": align_to, "user": "a", "val": 1},
    {"time": align_to + timedelta(seconds=4), "user": "a", "val": 1},
    {"time": align_to + timedelta(seconds=5), "user": "b", "val": 1},
    {"time": align_to + timedelta(seconds=8), "user": "a", "val": 1},
    {"time": align_to + timedelta(seconds=12), "user": "a", "val": 1},
    {"time": align_to + timedelta(seconds=13), "user": "a", "val": 1},
    {"time": align_to + timedelta(seconds=14), "user": "b", "val": 1},
]
stream = op.input("input", flow, TestingSource(inp))
keyed_stream = op.key_on("key_on_user", stream, lambda e: e["user"])

Windowers and Clocks#

In addition to a sense of time, windowing operators also require a configuration that determines how items are assigned to windows. Items can be assigned to one or more windows, depending on the desired behavior. In this example, we’ll be using the TumblingWindower assigner, which will assign each item to a single, fixed duration window for each key in the stream.

In the following snippet, we configure the EventClock to determine the time of items flowing through the dataflow with a lambda that reads the “time” key of each item we created in the dictionary above.

We also configure our EventClock with a value for the wait_for_system_duration parameter.

wait_for_system_duration is the amount of system time we’re willing to wait for any late arriving items before closing the window and emitting it downstream. After a window is closed, late arriving items for that window will be discarded.

In order for windows to be generated consistently, we finally supply the align_to parameter, which says that all windows that we collect will be aligned to the given datetime. We’ll use the value that we created above for our event data.

clock = EventClock(lambda e: e["time"], wait_for_system_duration=timedelta(seconds=0))
windower = TumblingWindower(length=timedelta(seconds=10), align_to=align_to)

Window Operators#

Now that we have our windower and clock, we need to define the processing step or operation that we would like to perform on each window. We’ll use the collect_window operator to collect all of the events for a given user in each window.

win_out = win.collect_window("add", keyed_stream, clock, windower)

Every windowing operator results in three distinct downstreams packaged into a WindowOut object:

  1. The result stream, named down.

  2. Late items.

  3. Window metadata.

In this introduction, we’ll focus just on the result stream.

Result Stream#

The result stream of all window operators is a tuple in the format: (key, (window_id, value)) where window_id is an opaque ID for the window that resulted in this value. In this case, since we used the collect_window operator, the downstream values will be the collected list of all items in the window.

We’ll write this to standard output using our output operator:

op.output("out", win_out.down, StdOutSink())

Running our Dataflow#

We can run our dataflow with:

$ python -m bytewax.run window_example

Running our dataflow, we should see the following output:

('a', (0, [{'time': datetime.datetime(2022, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 4, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 8, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}]))
('b', (0, [{'time': datetime.datetime(2022, 1, 1, 0, 0, 5, tzinfo=datetime.timezone.utc), 'user': 'b', 'val': 1}]))
('a', (1, [{'time': datetime.datetime(2022, 1, 1, 0, 0, 12, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 13, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}]))
('b', (1, [{'time': datetime.datetime(2022, 1, 1, 0, 0, 14, tzinfo=datetime.timezone.utc), 'user': 'b', 'val': 1}]))

Bytewax has created a window for each key (in our case, the user value) and collected all of the items that were encountered in that window.

Wrapping Up#

Bytewax offers multiple processing shapes, window assignment types and other configuration options. For more detailed information about windowing, please see the Windowing section, and the windowing API documentation in bytewax.operators.windowing.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now