Kafka
Kafka Features
not native support delay message
Kafka Multi-Broker Environment Setup
This is for single server multi-brokers. multi-server multi-brokers are very similar, only difference is to setup kafka server config and start in different servers, and no need to use different ports.
install Java
install Kafka (included Zookeeper)
cd /data
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0add server config for multi-broker
Each broker has its config file.
Edit each config file.
start Zookeeper
start Kafka Server (two brokers)
make Kafka/Zookeeper Service
make zookeeper service:
make two kafka broker services:
After all, you can run two broker services via:
Kafka CLI
Create a topic:
./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic my-topic
List all topics:
./kafka-topics.sh --list --zookeeper zookeeper:2181
run a producer console, then we can type some messages and produce to kafka.
./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
Consume:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
PHP + Kafka
install and enable php rdkafka extention.
config and produce code example:
RD_EXPORT void rd_kafka_conf_set_log_cb
(
rd_kafka_conf_t *
conf,
void(*)(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
log_cb
)
some other codes
yeah, this equals to
You can read my findings regarding how error handling works in php-rdkafka here: php-enqueue/enqueue-dev#749 Specifically:
error handling
can use setErrorCb to do error handling:
Kafka in Docker
here is a typical setting in docker-compose.yml
kafka data persistent issues
NEED to mount /kafka and /var/run/docker.sock out of container use volumes.
KAFKA_LOG_DIR NEED to be set, or every time docker restart, kafka will create a new log dir with hashed suffix and use it, which means loss of old data. (old data physically doesn't disappeared, but in another universe - another log directory ).
But when docker restart, there will be a fatal error about Cluster ID not match:
ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID m1Ze6AjGRwqarkcxJscgyQ doesn't match stored clusterId Some(1TGYcbFuRXa4Lqojs4B9Hw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:220) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala) [2020-01-04 15:58:43,303] INFO shutting down (kafka.server.KafkaServer)
it related to a Kafka mechanism:
introduced in the Kafka 2.4.0 release and it is [KAFKA-7335] - Store clusterId locally to ensure broker joins the right clusterWhen the docker restart happens, Kafka tries to match the locally stored clusterId ( in logs/meta.properties ) to the Zookeeper's clusterId(which changed because of docker restart) due to this mismatch, the above error is thrown. Please refer to this link for more information.
There are some approaches to fix the problem:
persistent zookeeper data by edit
docker-compose.ymlzookeeper service:
but every time restart, there will be zookeeper log file generated. not small
delete or modifymeta.propertiesin kafka logs. local cluster id is in it.
it will lost old data too even data is there.
another container access host machine Kafka service
use host.docker.internal as hostname. e.g:
"default_bootstrap_server": ["host.docker.internal:9095", "host.docker.internal:9096"]
it works when:
kafka is running directly in local machine
or
kafka is running in other container but exposed to local machine.
Last updated
Was this helpful?