Installing Kafka Command-Line Tools using Docker
To run Confluent Kafka using Docker, use the following command:
docker run -d --name zookeeper -p 2181:2181 confluentinc/cp-zookeeper:latest
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper confluentinc/cp-kafka:latest
Starting and Stopping Kafka Brokers
docker start kafka
docker stop kafka
Creating Topics using Command-Line
Execute a shell in the running Kafka Docker container:
docker exec -it kafka /bin/bash
kafka-topics --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
Producing and Consuming Messages from CLI
Open a shell inside the Kafka container:
docker exec -it kafka /bin/bash
kafka-console-producer --topic my_topic --broker-list localhost:9092
kafka-console-consumer --topic my_topic --bootstrap-server localhost:9092
Commonly Used CLI Commands for Administration
-
List topics:
kafka-topics --list --bootstrap-server localhost:9092
-
Describe a topic:
kafka-topics --describe --topic my_topic --bootstrap-server localhost:9092
-
Delete a topic:
kafka-topics --delete --topic my_topic --bootstrap-server localhost:9092
25 Questions
- How do you start a Kafka broker with a custom configuration file in Docker?
- What command changes the number of partitions for an existing topic?
- How to consume messages from a specific partition?
- How to produce messages with a specific key?
- Steps to stop all running Kafka and ZooKeeper containers?
- How to list consumer groups?
- How to reset the offset of a consumer group for a specific topic?
- How to alter the configuration of an existing topic?
- What command describes consumer groups?
- How to check the Kafka broker status?
- How to produce messages in a round-robin fashion to different partitions?
- How to set retention policies for a topic?
- How to change the replication factor of an existing topic?
- How to list all the brokers that are part of the Kafka cluster?
- How to purge all messages from a Kafka topic?
- How to read messages from a Kafka topic starting from the beginning?
- How to check the last offset for each partition of a topic?
- How to monitor the lag of a Kafka consumer group?
- How to set a quota for a Kafka client?
- How to authenticate against a Kafka broker?
- How to encrypt communication between Kafka brokers and clients?
- How to add or remove a broker to/from a Kafka cluster?
- How to perform a rolling restart of a Kafka cluster?
- How to configure ACLs for a Kafka topic?
- How to configure log segments in Kafka?
Solutions
- Use
docker run
with a mounted customserver.properties
:docker run -v /path/to/custom/server.properties:/etc/kafka/server.properties ...
kafka-topics --alter --topic my_topic --partitions 5 --bootstrap-server localhost:9092
kafka-console-consumer --topic my_topic --partition 0 --bootstrap-server localhost:9092
echo "key,value" | kafka-console-producer --topic my_topic --property "key.separator=," --broker-list localhost:9092
docker stop kafka zookeeper && docker rm kafka zookeeper
kafka-consumer-groups --list --bootstrap-server localhost:9092
kafka-consumer-groups --reset-offsets --group my_group --topic my_topic --to-earliest --execute --bootstrap-server localhost:9092
kafka-configs --alter --entity-type topics --entity-name my_topic --add-config max.message.bytes=128000 --bootstrap-server localhost:9092
kafka-consumer-groups --describe --group my_group --bootstrap-server localhost:9092
- Use
docker logs kafka
or monitoring tools. - Use
kafka-console-producer
without specifying keys. - Use
kafka-configs --alter
withretention.ms
andretention.bytes
parameters. - You can’t directly change the replication factor; create a new topic with the desired replication factor and copy the data.
- Use ZooKeeper CLI or tools like
kafkacat
. - Use
kafka-delete-records
. kafka-console-consumer --from-beginning --topic my_topic --bootstrap-server localhost:9092
- Use
kafka-run-class kafka.tools.GetOffsetShell
kafka-consumer-groups --describe --group my_group --bootstrap-server localhost:9092
- Use
kafka-configs --alter
withclient
entity type. - Through SASL mechanisms like PLAIN, SCRAM, etc.
- By enabling SSL/TLS.
- Add or remove a broker and then use partition reassignment.
- Use
kafka-rolling-restart
script or similar tools. - Use
kafka-acls
. - Configure
log.segment.bytes
andlog.segment.ms