Producer in Python
Phew ! Congrats on arriving to this section. We will learn how to create a Producer and write to your Kafka Cluster.
Getting familiar with Kafka-Python client
"Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators)."
Getting familiar with kafka-python producer
Checkout the documentation for creating a Kafka Producer
Create a Kafka Producer
Once you've read enough :) go to the producer.py
file and create a producer under produce_messages
method
producer = KafkaProducer(bootstrap_servers= .... )
Send Messages
Sending messages to kafka it could be as simple as Sending Strings :) This code will show you how to send it to a topic called twitter-handlers
twitter_handlers = ["@WWCodeDublin", "@PyLadiesDublin"]
for id, tw_handle in enumerate(twitter_handlers):
# build a message
msg = 'Hello Twitter users, follow this awesome accounts: {}, message-counter: {}'.format(tw_handle, id)
# send a message to the topic
producer.send(topic='twitter-handlers', value=str.encode(msg))
# always always be good citizens and close your producer after done :)
producer.close()
Serialize your data to Avro
In real life, the probability of sending single strings to a topic is barely true. Reality is companies and projects are loaded with heavily complex data structures and schemas to be processed by services and applications.
We will work with the following schema:
{"namespace": "meetus.users",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]},
{"name": "email", "type": ["string", "null"]},
{"name": "programming_language", "type": ["string", "null"]}
]
}
The schema provided will help us to serialize the data into avro format.
We are going to run through the Avro code quickly before heading off to sending our messages:
msgs_sent_counter = 0
with open(args.data_path) as f:
for line in f:
if not line.strip():
continue
# load each line of your data file
msg = json.loads(line)
# get an in-memory byte buffer
bytes_writer = io.BytesIO()
# get the avro binary encoder
encoder = avro.io.BinaryEncoder(bytes_writer)
# serialize the data with the message and the encoder
writer.write(msg, encoder)
# get the serialized bytes from the buffer
raw_bytes = bytes_writer.getvalue()
Sending messages with Avro format
The code above is not completed :) We have just created each avro message. Could you guess what else is needed ?
Try yourself to send the messages.
In order to check if you are correctly sending the messages use the kafka-console-producer
previously seen or wait until next section :-P
Good practice: Do not forget to close your producer once you are done.
Last updated