Aim

Kafka is becoming very important tool for creating scalable applications. Today's world is data driven and Kafka is one of the tool works well with large data. In this post we will be writing a Kafka producer in Python. We will create a scenario of user registration and then we will push the registration data to Kafka.

Prerequisite

  • You must have Kafka and zookeeper installed.
  • Python 3.x must be installed

Install the Python dependencies

pip install Faker
pip install kafka-python

Generating the registered user data

  • As we are going to push the user registration data into Kafka so we need to generate fake user registration data
  • we will be using python Faker to generate fake data
  • Create a data.py file to where we will create function to generate fake user registration data
touch data.py
  • Then we will add our code to data.py file
from faker import Faker

fake = Faker()

def get_registered_user():
    return {
        "name": fake.name(),
        "address": fake.address(),
        "created_at": fake.year()
    }
  • we are importing the Faker module and then creating a instance of Faker class
  • The get_registered_user method returns a user registration data with name, address, created_at

Create the Kafka Producer

  • we will create a file producer.py
touch producer.py
from kafka import KafkaProducer
import json
from data import get_registered_user
import time

def json_serializer(data):
    return json.dumps(data).encode("utf-8")

producer = KafkaProducer(bootstrap_servers=['192.168.0.10:9092'],
                         value_serializer=json_serializer)

if __name__ == "__main__":
    while 1 == 1:
        registered_user = get_registered_user()
        print(registered_user)
        producer.send("registered_user", registered_user)
        time.sleep(4)
  • we have created a JSON serializer method that will accept any json value and convert that to as string
def json_serializer(data):
    return json.dumps(data).encode("utf-8")
  • Then we have created a KafkaProducer instance
producer = KafkaProducer(bootstrap_servers=['192.168.0.10:9092'],
value_serializer=json_serializer)
  • Then bootstrap server option in KafkaProducer takes the address of your Kakfa server
  • Then we are calling the get_registered_user from the data module that we have created to fetch our fake registered user data
registered_user = get_registered_user()
  • we are calling the send method on Kafka Producer instance to publish the message to Kafka Broker
 producer.send("registered_user", registered_user)
  • we are publishing the message at an interval of 4 seconds
 time.sleep(4)

Running the Kafka Producer

python producer.py

Happy Coding

0 0 vote
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x