Aim

The current world is heavily dependent on data. Everyone is generating large amount. It is becoming challenge reading large amount of data and then process it and finally perform some action on that data.

In this post we will be creating a data pipeline where in we will be performing three responsibilities

  1. Read data from Kafka topic
  2. Process it using Logstash
  3. Dump the data to elastic search and then visualize the data using Kibana

Data Pipeline architecture

kafka-elasticsearch-data-pipeline-architecture

There are five important components in this pipeline

  1. Kafka Server : There is the point where data is published first.
  2. Producer : Producer plays the role of publishing data to Kafka topic. In real world you can have any entity that produces data to kafka topic. In our example we will be generating fake user registration data.
  3. Elasticsearch : This will act as a database that will store the user registration data into itself. But the main question that arises is which entity will push data to elasticsearch.
  4. Logstash : Logstash will play the role of middle man where we which will read data from Kafka topic and then it will insert that data to Elasticsearch.
  5. Kibana : Kibana will play the role of Graphical User Interface which will present the data in a readable or graphical format

Prerequisite

Making the Environment Ready

Kafka & Zookeeper & Kafka Manager

  • As we need kafka where Publisher can publish the registered user data, so we need to install Kafka.
  • We will be installing Kafka using docker compose. Install Kafka-Zookeeper-KafkaManager Using Docker Composer
  • Once you start the servces present in docker compose file then the most important points are
  • Kafka is listening on PORT 9092
  • Zookeeper is listening on PORT 2181
  • Kafka Manager is listening on PORT 9000

Elasticsearch && Kibana

Logstash

  • We are using Ubuntu 18.04, so you can set up Logstash using this post. Install Logstash on Ubuntu 18.04
  • The configuration folder for Logstash will be located at
cd /etc/logstash/conf.d/

Kafka Producer : What we will be doing

  • We will be creating a Kafka producer that will produce registered user data to the Kafka topic
  • We will be using python Faker module to generate fake registered user data

Kafka Producer : Install the dependencies

pip install Faker
pip install kafka-python

Kafka Producer : Writing the Producer

  • The final producer will be something like this
from faker import Faker
from kafka import KafkaProducer
import json
from data import get_registered_user
import time

fake = Faker()

def get_registered_user():
    return {
        "name": fake.name(),
        "address": fake.address(),
        "created_at": fake.year()
    }

def json_serializer(data):
    return json.dumps(data).encode("utf-8")

producer = KafkaProducer(bootstrap_servers=['192.168.0.10:9093'],
                         value_serializer=json_serializer)

if __name__ == "__main__":
    while 1 == 1:
        registered_user = get_registered_user()
        print(registered_user)
        producer.send("registered_user", registered_user)
        time.sleep(4)
  • The data is being pushed to the registered_user topic
  • Kafka broker is running at 9093 port

Learn the details about this producer in this post

Create the Logstash Pipeline

  • we will read data from kafka topic registered_user
  • Then we will push the data to elasticsearch index named registered_user
  • Goto the configuration folder of logstash
cd /etc/logstash/conf.d/
  • create a new file
touch kafka-elastic-pipeline.conf
  • put the below contents in the conf file
input {
    kafka {
            bootstrap_servers => "192.168.0.10:9093"
            topics => ["registered_user"]
    }
}

output {
   elasticsearch {
      hosts => ["192.168.0.10:9200"]
      index => "registered_user"
      workers => 1
    }
}
  • The pipeline will create an input section that will try to read the data from kafka broker
  • The Kafka server address is given by the bootstrap_servers config
  • we will be reading the data from registered_user topic
input {
    kafka {
            bootstrap_servers => "192.168.0.10:9093"
            topics => ["registered_user"]
    }
}
  • The pipeline will create output section that will push data to elastic search server
  • Elasticsearch is running at 192.16.0.10:9200
  • we are pushing the data to registered_user index of elastic search

Save the file and your pipeline will be active

You can check logstash logs at /var/log/logstash folder

Access data in Kibana

Kibana Management Option

  • Click on Index Patterns and then click on Create Index Pattern

Kibana Create Index Pattern

  • Search for your index then click next step
  • Then click on show advanced options and select @timestamp from the dropdown box
  • Finally click on create index pattern and your index will be created

Viewing Data in Kibana

  • Once the index has been created then you can go to Discover menu of Kibana

Discover Menu

  • You will see the index registered-user is already selected and the data is present.

Elastic-search-index-data

Happy Coding

0 0 vote
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x