1.) Overview Apache Kafka is a distributed streaming platform. It is used for building real-time data platforms and streaming applications. ...
1.) Overview
Apache Kafka is a distributed streaming platform. It is used for building real-time data platforms and streaming applications. In this blog, we will discuss how to install Kafka and work on some basic use cases.
This article was created using Apache Kafka version 2.12-2.1.0.
This article was created using Apache Kafka version 2.12-2.1.0.
2.) Installation
Download and unpack Kafka from https://kafka.apache.org/downloads.
2.1) Configuration
config/zookeeper.properties
- Set the dataDir /tmp//kafka/zookeeper
- log.dirs=/tmp/kafka/logs
- zookeeper.connect=localhost:2181
- listeners=PLAINTEXT://localhost:9092
To test Kafka run the following commands.
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties
The second command will start a new command prompt and you should see some logs in zookeeper.
3.) Kafka Topics
Create a topic:
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic clicks
View the topics:
>bin/kafka-topics.sh --list --zookeeper localhost:2181
Delete the topic (execute at the end):
>bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic clicks
4.) Sending and Receiving Messages
Send messages:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
-Enter some messages here and leave the command open
Receive the messages:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicks --from-beginning
-You should be able to receive the messages that haven't been read yet
5.) Multi Broker
Make 2 copies of config/server.properties. Set the following properties:
config/server-1.properties
- broker.id=1
- listeners=PLAINTEXT://:9093
- log.dir=/tmp/kafka-logs-1
- broker.id=2
- listeners=PLAINTEXT://:9094
- log.dir=/tmp/kafka-logs-2
>bin/kafka-server-start.sh config/server.1.properties
>bin/kafka-server-start.sh config/server.2.properties
Create a new topic that will be replicated on the original node plus the two new.
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic clicks-replicated
You can run the view topics command again (above).
We can also describe the newly created topic as we specified:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks
You can run the view topics command again (above).
We can also describe the newly created topic as we specified:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks
6.) Fault Tolerance
Now, we can send some messages to our replicated topic:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks-replicated
Read the message in the replicated topic:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic clicks-replicated
Now, shut down the second node by ctrl + c in the command or close it.
Again, we can describe the replicated topic.
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated
We can the messages again from the beginning (original and 1st node, node that the second node is off).
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic clicks-replicated
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic clicks-replicated
*Close all the terminals except zookeeper and the original topic using port 9092.
7.) Import / export data from and to a file using a connector
Kafka can also read and write from and to a file. Let's try that by using the default configurations.
- connect-standalone.properties - is basically server.properties
- connect-file-source.properties - specify the source file to read (default: test.txt, note topic value here)
- connect-file-sink.properties - where to write (default: test.sink.txt)
Run the connector
>bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
-Create a test.txt file where you run the connector and add some text to it. Make sure that you end with a newline. Otherwise, the last line will not be read.
Notice the log we should have something like:
[2019-01-13 16:17:09,799] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)
[2019-01-13 16:17:10,838] INFO Cluster ID: MYm1bMttRdCqG-njYXeO-w (org.apache.kafka.clients.Metadata:285)
There should be a newly created file with the same content named: test.sink.txt.
Note that you can still read the messages using the consumer. Topic=connect-test is from connect-file-source.properties:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
Modify the test.txt, adding "Hello World!" and your consumer should be able to pickup the message.
>{"schema":{"type":"string","optional":false},"payload":"Hello World!"}
*Terminate the consumer but leave server0 open.
8.) Streaming using WordCount app
Now let's create a new file with the following content:
>echo -e "The quick brown fox jumps over the lazy dog.\nThe quick brown fox jumps over the lazy dog." > file-input.txt
Create a new topic:
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
Send file data to the topic, it could come from a stream.
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input < file-input.txt
Consume the input:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input --from-beginning
We can use the WordCount app package with Kafka to stream the data from the file we just created.
>bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
Consume the messages using String and Long deserializers:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
You should have an output similar to:
the 1
quick 1
brown 1
fox 1
jumps 1
over 1
the 2
lazy 1
dog. 1
the 3
quick 2
brown 2
fox 2
jumps 2
over 2
the 4
lazy 2
dog. 2
COMMENTS