Introducing Quix Streams: an open-source Python library for Kafka

You might be wondering why the world needs another Python framework for Kafka. After all, there are a lot of existing libraries and frameworks to choose from, such as kafka-python, Faust, PySpark, and so on.
The focus of Quix Streams however is time-series and telemetry data, so the features are optimized for telemetry-related use cases. This could be device telemetry (it was originally road-testing on sensor data from Formula 1 racing cars) or other types of telemetry data such as metrics, logs, and traces.
It's also designed to help you get the best out Apache Kafka's horizontal scaling capabilities. This is especially important if you need to process a large firehose of data (e.g. 60,000 data points a second).
Nevertheless, you don't have to be doing real-time ML on Formula 1 telemetry to find Quix Streams useful— my hope is that its simplicity and performance will make many of you more productive and I'm excited to see what other use cases you find for it.
What you can do with Quix Streams
To help you learn more about what you can do with this library, here's a list of core features with simplified code samples to demonstrate how they work:
Use Pandas DataFrames to produce data more efficiently
Time-series parameters are emitted at the same time, so they share one timestamp. Handling this data independently is wasteful. The library uses a tabular system that can work for instance with Pandas DataFrames natively. Each row has a timestamp and user-defined tags as indexes
- For a complete, runnable example of how to use the library with Pandas to stream data directly from a CSV, see this gist.
Produce time-series data without worrying about serialization or deserialization
Quix Streams serializes and deserializes time-series data using different codecs and optimizations to minimize payloads in order to increase throughput and reduce latency.
- The following example shows data being appended to as stream with the
add_value
method:Leverage built-in buffers to optimize processing operations for windows of time-series data
If you're sending data at high frequency, processing each message can be costly. The library provides built-in time-series buffers for producing and consuming, allowing several configurations for balancing between latency and cost.
- For example, you can configure the library to release a packet from the buffer whenever 100 items of timestamped data are collected or when a certain number of milliseconds in data have elapsed (using timestamps in the data rather the consumer machine's clock).
buffer.packet_size = 100
buffer.time_span_in_milliseconds = 100
- You can then read from the buffer and process it with the
on_read
function.Produce and consume different types of mixed data
This library allows you to produce and consume different types of mixed data in the same timestamp, like numbers, strings or binary data.
- For example, you can produce both time-series data and large binary blobs together.
-
Often, you'll want to combine time series data with binary data. In the following example, we combine bus's onboard camera with telemetry from its ECU unit so we can analyze the onboard camera feed with context.
- You can also produce events that include payloads:
-
For example, you might need to listen for changes in time-series or binary streams and produce an event (such as "speed limit exceeded"). These might require some kind of document to send along with the event message (e.g. transaction invoices, or a speeding ticket with photographic proof). Here's an example for a speeding camera:
Use stream contexts for horizontal scaling
Stream contexts allow you to bundle data from one data source into the same scope with supplementary metadata – which enables workloads to be horizontally scaled with multiple replicas.
- In the following sample, the
create_stream
function is used to create a stream called bus-123AAAV which gets assigned to one particular consumer and will receive messages in the correct order:Leverage built-in stateful processing for greater resiliency
The library includes an easy-to-use state store combining blob storage and Kubernetes persistence volumes that ensures quick recovery from any outages or disruptions.
To use it, you can create an instance of LocalFileStorage
or use one of our helper classes to manage the state such as InMemoryStorage
.
Here's an example of a stateful operation sum for a selected column in data:
Other performance and usability enhancements
The library also includes a number of other enhancements that are designed to simplify the process of managing configuration and performance when interacting with Kafka:
- No schema registry required: The library doesn't need a schema registry to send different sets of types or parameters, this is handled internally by the protocol. This means that you can send more than one schema per topic.
- Message splitting: Quix Streams automatically handles large messages on the producer side, splitting them up if required. You no longer need to worry about Kafka message limits. On the consumer side, those messages are automatically merged back.
- Message Broker configuration: Many configuration settings are needed to use Kafka at its best, and the ideal configuration takes time. The library takes care of Kafka configuration by default but also supports custom configurations.
- Checkpointing: The library supports manual or automatic checkpointing when you consume data from a Kafka Topic. This enables you to inform the message broker that you have already processed messages up to one point (and not process the same messages again in case of an unplanned restart).
- Horizontal scaling: Quix Streams handles horizontal scaling using the streaming context feature. You can scale the processing services, from one replica to many and back to one, and the library ensures that the data load is always shared between your replicas reliably.
For a detailed overview of features, see the library documentation.
Getting Started
To quickly try out Quix Streams, you just need to install the library and set up a local Kafka instance.
Install Quix Streams
Install Quix streams with the following command:
python3 -m pip install quixstreams
- To install Quix Streams on Macs with M1 or M2 chips, see this special installation guide: Installing on Quix Streams on a M1/M2 Mac.
Install Kafka locally
This library needs to utilize a message broker to send and receive data. To install and test Kafka locally:
- Download the Apache Kafka binary from the Apache Kafka Download page.
- Extract the contents of the file to a convenient location (i.e.
kafka_dir
), and start the Kafka services with the following commands:
Linux / macOS
/bin/zookeeper-server-start.sh config/zookeeper.properties
/bin/zookeeper-server-start.sh config/server.properties
Windows
binwindowszookeeper-server-start.bat.configzookeeper.properties
binwindowskafka-server-start.bat .configserver.properties
- You can find more detailed instructions in Apache Kafka's official documentation
- You can also find a comprehensive Quick Start guide for Quix Streams in the official documentation.
The following examples will give you a basic idea of how to produce and consume data with Quix Streams.:
Producing time-series data
Here's an example of how to produce time-series data into a Kafka Topic with Python.
import quixstreams as qx
import time
import datetime
import math
# Connect to your kafka client
client = qx.KafkaStreamingClient('127.0.0.1:9092')
# Open the output topic which is where data will be streamed out to
# If the topic does not exist, it will be created
topic_producer = client.get_topic_producer(topic_id_or_name = "mytesttopic")
# Set stream ID or leave parameters empty to get stream ID generated.
stream = topic_producer.create_stream()
stream.properties.name = "Hello World Python stream"
# Add metadata about time series data you are about to send.
stream.timeseries.add_definition("ParameterA").set_range(-1.2, 1.2)
stream.timeseries.buffer.time_span_in_milliseconds = 100
print("Sending values for 30 seconds.")
for index in range(0, 3000):
stream.timeseries
.buffer
.add_timestamp(datetime.datetime.utcnow())
.add_value("ParameterA", math.sin(index / 200.0) + math.sin(index) / 5.0)
.publish()
time.sleep(0.01)
print("Closing stream")
stream.close()
Consuming time-series data
Here's an example of how to consume time-series data from a Kafka Topic with Python:
import quixstreams as qx
import pandas as pd
# Connect to your kafka client
client = qx.KafkaStreamingClient('127.0.0.1:9092')
# get the topic consumer for a specific consumer group
topic_consumer = client.get_topic_consumer(topic_id_or_name = "mytesttopic",
consumer_group = "empty-destination")
def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
# do something with the data here
print(df)
def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
# subscribe to new DataFrames being received
# if you aren't familiar with DataFrames there are other callbacks available
# refer to the docs here: https://docs.quix.io/sdk/subscribe.html
stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler
# subscribe to new streams being received
topic_consumer.on_stream_received = on_stream_received_handler
print("Listening to streams. Press CTRL-C to exit.")
# Handle termination signals and provide a graceful exit
qx.App.run()
For full documentation of how to consume and produce time-series and event data with Quix Streams, see the docs.
What's Next
This is the first iteration of Quix Streams, and the next release is already in the works.
The main highlight is a new feature called "streaming data frames" that simplifies stateful Stream Processing for users coming from a batch processing environment. It eliminates the need for users to manage state in memory, update rolling windows, deal with checkpointing and state persistence, and manage state recovery after a service unexpectedly restarts.
By introducing a familiar interface to Pandas DataFrames, my collaborators and I hope to make stream processing even more accessible to data professionals who are new to streaming data.
The following example shows how you would perform rolling window calculation on a streaming data frame:
Note that this is exactly how you would do the same calculation on static data in Jupyter notebook – so will be easy to learn for those of you who are used to batch processing.
There's also no need to grapple with the complexity of stateful processing on streaming data – this will all be managed by the library. Moreover, although it will still feel like Pandas, it will use binary tables under the hood – which adds a significant performance boost compared to traditional Pandas DataFrames.
To find out when the next version is ready, make sure you watch the Quix Streams GitHub repo.
The roadmap should also be shaped by feedback and contributions from the wider data community:
- If you find a bug or want to request an enhancement, feel free to log a GitHub issue.
- If you have questions, need help, or simply want to find out more about the library, try posting a message in the Slack community "The Stream" (which I help to moderate) or check out the documentation.
- If you want to improve the library, see the contribution guidelines.