Consumer in Python
Last but not least :-) Let's consume our events with the Kafka Python client
Getting familiar with kafka-python consumer
KafkaConsumer will consume records from a Kafka cluster
The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers.
Did you know that what of the nice features of Kafka is that you can achieve High throughput in Consumers by having Consumer Groups :) Let's check out the next 2 figures before moving on.


Let's check the KafkaConsumer official documentation for creating a Kafka Consumer
Once you have read for few minutes about how is the KafkaConsumer API, let's do a first try and open consumer.py
python file
Try to create a consumer under the consume
method
consumer = KafkaConsumer(...)
Subscribe to a Topic
When you are creating a Kafka consumer, you could either pass the topic or a list of topics from where you would like to consume from or you can subscribe before consuming.
Include the following line after creating the consumer and replace TOPIC_NAME
with one of the topics you created previously
# Subscribe to a topic
consumer.subscribe(TOPIC_NAME)
Consume Messages
Consuming messages is a simple as iterating over the consumer iterator
object. Try out applying this code over your file and run it then produce some messages and you will see in the STDOUT the messages flowing through.
for message in consumer:
print ("Consumer-{} Topic:{}, Partition:{} Offset:{}: key={} value={}".format(consumer_id,
message.topic,
message.partition,
message.offset,
message.key,
message.value)
)
consumer.close()
Deserializing Avro data
Did you remember the schema presented over the Producer section? Cool, keep it in mind or check it out because we are about to work with it again.
Usually we do serialize the data we want to produce in the producer side, and once received in the consumer side as in binary format , we need to transform it back again to readable format so that services and applications can do their processing with it.
First let's read our schema to deserialize our data. Put this code right before declaration of consume
method
# change this to point to your new schema :)
SCHEMA_PATH = '../avro-schemas/user.avsc'
# SCHEMA
SCHEMA = kafka_utils.read_schema(SCHEMA_PATH)
The code below will show you how to deserialize data in avro format.
# create a in-memory bytes buffer
bytes_reader = io.BytesIO(message.value)
# create the binary Decoder
decoder = avro.io.BinaryDecoder(bytes_reader)
# create a datumReader responsible to deserialize the data from avro to readable format using the Schema
reader = avro.io.DatumReader(SCHEMA)
# Read the incoming data
value = reader.read(decoder)
Consume Data in Avro Format
Now that we have the code for deserializing the data coming out from Kafka and We have created the consumer :)
Try to put the pieces together under the consume
method and try to print it out :)
Good practice: Do not forget to close your Consumer once you are done.
It seems there is something wrong with the output :)
Try to figure what it's happening and update your code to make it working
Last updated