Table of Contents:
- Background
- Prerequisite
- Running a Kafka cluster on OSX
- Verifying the Kafka cluster
- How does it work?
- Connect to it from NodeJs
- Conclusion
Background
Investigating a Kafka issue regarding cluster environment is hard. Most of the time, we need a Kafka cluster on our local machine. There is an official way of doing this. Unfortunately, it does not work on OSX due to the unsupported host network (at the time of writing).
After taking sometimes finding a way myself, I’d like to share it with you and hope to help save your precious time.
We are going to create a Kafka cluster with 3 brokers and 1 zookeeper that can be accessed both within the docker environment and outside.
Prerequisite
Running a Kafka cluster on OSX
Checkout this git repo.
git checkout https://github.com/pongsatt/kafka-local-cluster.git
cd kafka-local-cluster
docker-compose up
For those who does not want to check it out, here is the docker-compose.yml.
version: '3.3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
healthcheck:
test: echo stat | nc localhost 2181
interval: 10s
timeout: 10s
retries: 3
environment:
- ZOOKEEPER_SERVER_ID=1
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_INIT_LIMIT=5
- ZOOKEEPER_SYNC_LIMIT=2
- ZOOKEEPER_SERVERS=zookeeper:2888:3888
kafka1:
image: confluentinc/cp-kafka:latest
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9091:9091"
environment:
- KAFKA_LISTENERS=LISTENER_INT://kafka1:29091,LISTENER_EXT://0.0.0.0:9091
- KAFKA_ADVERTISED_LISTENERS=LISTENER_INT://kafka1:29091,LISTENER_EXT://localhost:9091
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=1
- ZOOKEEPER=zookeeper:2181
kafka2:
image: confluentinc/cp-kafka:latest
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- KAFKA_LISTENERS=LISTENER_INT://kafka2:29092,LISTENER_EXT://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=LISTENER_INT://kafka2:29092,LISTENER_EXT://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=2
- ZOOKEEPER=zookeeper:2181
kafka3:
image: confluentinc/cp-kafka:latest
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
- KAFKA_LISTENERS=LISTENER_INT://kafka3:29093,LISTENER_EXT://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=LISTENER_INT://kafka3:29093,LISTENER_EXT://localhost:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=3
- ZOOKEEPER=zookeeper:2181
Verifying the Kafka cluster
We will verify by creating a test topic, producing and consuming some messages.
Note.
To connect from within the docker environment, Zookeeper:
zookeeper:2181
Brokers:kafka1:29091,kafka2:29092,kafka3:29093
Create test topic.
docker exec -it kafka-local-cluster_kafka1_1 \
kafka-topics --create --topic test --partitions 3 --replication-factor 3 --if-not-exists --zookeeper zookeeper:2181
Output.
Created topic test.
Generate some messages to the topic.
docker exec -it kafka-local-cluster_kafka1_1 \
bash -c "seq 10 | kafka-console-producer --broker-list kafka1:29091,kafka2:29092,kafka3:29093 --topic test && echo 'Produced 10 messages.'"
Output.
Produced 10 messages.
Consuming the produces messages.
docker exec -it kafka-local-cluster_kafka1_1 \
kafka-console-consumer --bootstrap-server kafka1:29091,kafka2:29092,kafka3:29093 --topic test --from-beginning
Output.
1
2
3
4
5
6
7
8
9
10
Congratulation! You have a working local Kafka cluster.
Bonus. You can stop and start some kafka nodes using this command under kafka-local-cluster folder.
docker-compose stop kafka1
# bring it back
docker-compose stop kafka1
How does it work?
I would like to explain how this configuration works a bit.
...
kafka1:
image: confluentinc/cp-kafka:latest
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9091:9091"
environment:
- KAFKA_LISTENERS=LISTENER_INT://kafka1:29091,LISTENER_EXT://0.0.0.0:9091
- KAFKA_ADVERTISED_LISTENERS=LISTENER_INT://kafka1:29091,LISTENER_EXT://localhost:9091
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=1
- ZOOKEEPER=zookeeper:2181
...
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
We tell Kafka that we will use 2 urls.
LISTENER_INT is for each Kafka broker to talk to each other within docker. LISTENER_EXT is for clients outside docker to talk to the broker.
KAFKA_INTER_BROKER_LISTENER_NAME
KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
We tell Kafka that LISTENER_INT is for brokers.
KAFKA_LISTENERS
KAFKA_LISTENERS=LISTENER_INT://kafka1:29091,LISTENER_EXT://0.0.0.0:9091
Tell Kafka broker to open 2 addresses and ports kafka1:29091 and 0.0.0.0:9091 (0.0.0.0 means any address) for internal and external communication accordingly.
KAFKA_ADVERTISED_LISTENERS
KAFKA_ADVERTISED_LISTENERS=LISTENER_INT://kafka1:29091,LISTENER_EXT://localhost:9091
Tell Kafka that the client can connect to the broker using kafka1:29091 and localhost:9091.
Connect to it from NodeJs
Note.
To connect from outside the docker environment, Zookeeper:
localhost:2181
Brokers:localhost:9091,localhost:9092,localhost:9093
Checkout demo application.
git checkout https://github.com/pongsatt/kafka-local-cluster.git
cd kafka-local-cluster/node-kafka-demo
# install
npm i
Produce some messages.
node producer.js
Output.
producing message 0
producing message 1
producing message 2
producing message 3
producing message 4
producing message 5
producing message 6
producing message 7
producing message 8
producing message 9
done.
Run consumer.
node consumer.js
Output.
{ value: 'message 0' }
{ value: 'message 1' }
{ value: 'message 2' }
{ value: 'message 3' }
{ value: 'message 4' }
{ value: 'message 5' }
{ value: 'message 6' }
{ value: 'message 7' }
{ value: 'message 8' }
{ value: 'message 9' }
Conclusion
That’s it. You have a local Kafka cluster. I hope this help you investigate Kafka issue easier.