kafka-workshop
  • WWCode Dublin - Kafka Workshop
  • Setup your Environment
  • Start your Kafka server
  • Managing Topics
  • Producer
  • Consumer
  • Python Client - Overview
  • Producer in Python
  • Consumer in Python
Powered by GitBook
On this page
  • Getting familiar with kafka-python consumer
  • Subscribe to a Topic
  • Consume Messages
  • Deserializing Avro data
  • Consume Data in Avro Format

Consumer in Python

Last but not least :-) Let's consume our events with the Kafka Python client

PreviousProducer in Python

Last updated 6 years ago

Getting familiar with kafka-python consumer

KafkaConsumer will consume records from a Kafka cluster

The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers.

Did you know that what of the nice features of Kafka is that you can achieve High throughput in Consumers by having Consumer Groups :) Let's check out the next 2 figures before moving on.

Let's check the KafkaConsumer official documentation for creating a Kafka Consumer

Once you have read for few minutes about how is the KafkaConsumer API, let's do a first try and open consumer.py python file

Try to create a consumer under the consume method

KafkaConsumer
consumer = KafkaConsumer(...) 

Subscribe to a Topic

When you are creating a Kafka consumer, you could either pass the topic or a list of topics from where you would like to consume from or you can subscribe before consuming.

Include the following line after creating the consumer and replace TOPIC_NAME with one of the topics you created previously

# Subscribe to a topic 
consumer.subscribe(TOPIC_NAME)

Consume Messages

Consuming messages is a simple as iterating over the consumer iterator object. Try out applying this code over your file and run it then produce some messages and you will see in the STDOUT the messages flowing through.

for message in consumer:
        print ("Consumer-{} Topic:{}, Partition:{} Offset:{}: key={} value={}".format(consumer_id,
              message.topic,
              message.partition,
              message.offset,
              message.key,
              message.value)
              )

 consumer.close()

Spoiler Alert:

Sadly, in real production environments you won't consume only Strings or basic primitives data types. Instead your applications and services will produced complex and rich data in multiple formats like avro. Checkout the next section to learn how to deserialize data in Avro format.

Deserializing Avro data

Did you remember the schema presented over the Producer section? Cool, keep it in mind or check it out because we are about to work with it again.

Usually we do serialize the data we want to produce in the producer side, and once received in the consumer side as in binary format , we need to transform it back again to readable format so that services and applications can do their processing with it.

First let's read our schema to deserialize our data. Put this code right before declaration of consume method

Read Avro Schema
# change this to point to your new schema :)
SCHEMA_PATH = '../avro-schemas/user.avsc'

# SCHEMA
SCHEMA = kafka_utils.read_schema(SCHEMA_PATH)

The code below will show you how to deserialize data in avro format.

Deserialize code from avro
# create a in-memory bytes buffer
bytes_reader = io.BytesIO(message.value)
# create the binary Decoder
decoder = avro.io.BinaryDecoder(bytes_reader)
# create a datumReader responsible to deserialize the data from avro to readable format using the Schema
reader = avro.io.DatumReader(SCHEMA)
# Read the incoming data
value = reader.read(decoder)

Consume Data in Avro Format

Now that we have the code for deserializing the data coming out from Kafka and We have created the consumer :)

Try to put the pieces together under the consume method and try to print it out :)

Good practice: Do not forget to close your Consumer once you are done.

It seems there is something wrong with the output :)

Try to figure what it's happening and update your code to make it working

Bonus Exercise

  • Pick a topic that has more than 1 partition and try to balance the consumption of the messages to read from different partitions. Hint: read the diagram above

  • Change your KafkaConsumer declaration to include the following:

    • Read always from the earliest message available

    • Change the timeout of the Consumer. Currently is set to Forever wait for messages

Extra

  • Change the property to do manual offset commit and do the offset commit by yourself in code :)

Partition assignment to Consumer Group
A new Consumer joins the Consumer Group. There is a partition re-assigment
KafkaConsumer — kafka-python 1.4.3 documentation
Logo