Kafka

Kafka Features

not native support delay message

Kafka Multi-Broker Environment Setup

This is for single server multi-brokers. multi-server multi-brokers are very similar, only difference is to setup kafka server config and start in different servers, and no need to use different ports.

install Java

install Kafka (included Zookeeper)

cd /data

wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz

tar -xzf kafka_2.12-2.5.0.tgz

cd kafka_2.12-2.5.0

add server config for multi-broker

Each broker has its config file.

Edit each config file.

broker.id should be different.

if it's in single server, listeners port should be different.

start Zookeeper

start Kafka Server (two brokers)

make Kafka/Zookeeper Service

make zookeeper service:

zookeeper-server-stop.sh exit with code 143, if no SuccessExitStatus=143 service would regard it as a failure.

make two kafka broker services:

two services' only difference is ExecStart, use different server config (two broker server config)

Unit's Requires/After of zookeeper.service make sure kafka service will check if zookeeper service running, if not, automatically start it first.

SuccessExitStatus=143, reason is same as zookeeper service.

After all, you can run two broker services via:

kafka service already includes auto start of zookeeper.

Kafka CLI

Create a topic:

./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic my-topic

List all topics:

./kafka-topics.sh --list --zookeeper zookeeper:2181

run a producer console, then we can type some messages and produce to kafka.

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

Consume:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning

PHP + Kafka

  • install and enable php rdkafka extention.

config and produce code example:

if not use conf->setLogCb, it will dump all info to the stdoutput. use this setting can DIY how to deal with running info. reference is: https://docs.confluent.io/3.1.1/clients/librdkafka/rdkafka_8h.html#a06ade2ca41f32eb82c6f7e3d4acbe19f

RD_EXPORT void rd_kafka_conf_set_log_cb

(

rd_kafka_conf_t *

conf,

void(*)(const rd_kafka_t *rk, int level, const char *fac, const char *buf)

log_cb

)

Set logger callback.

The default is to print to stderr, but a syslog logger is also available, see rd_kafka_log_print and rd_kafka_log_syslog for the builtin alternatives. Alternatively the application may provide its own logger callback. Or pass func as NULL to disable logging.

This is the configuration alternative to the deprecated rd_kafka_set_logger()

some other codes

yeah, this equals to

You can read my findings regarding how error handling works in php-rdkafka here: php-enqueue/enqueue-dev#749 Specifically:

error handling

can use setErrorCb to do error handling:

Kafka in Docker

here is a typical setting in docker-compose.yml

kafka data persistent issues

NEED to mount /kafka and /var/run/docker.sock out of container use volumes.

KAFKA_LOG_DIR NEED to be set, or every time docker restart, kafka will create a new log dir with hashed suffix and use it, which means loss of old data. (old data physically doesn't disappeared, but in another universe - another log directory ).

But when docker restart, there will be a fatal error about Cluster ID not match:

ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID m1Ze6AjGRwqarkcxJscgyQ doesn't match stored clusterId Some(1TGYcbFuRXa4Lqojs4B9Hw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:220) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala) [2020-01-04 15:58:43,303] INFO shutting down (kafka.server.KafkaServer)

it related to a Kafka mechanism:

introduced in the Kafka 2.4.0 release and it is [KAFKA-7335] - Store clusterId locally to ensure broker joins the right clusterWhen the docker restart happens, Kafka tries to match the locally stored clusterId ( in logs/meta.properties ) to the Zookeeper's clusterId(which changed because of docker restart) due to this mismatch, the above error is thrown. Please refer to this link for more information.

There are some approaches to fix the problem:

  • persistent zookeeper data by edit docker-compose.yml zookeeper service:

but every time restart, there will be zookeeper log file generated. not small

  • delete or modify meta.properties in kafka logs. local cluster id is in it.

it will lost old data too even data is there.

another container access host machine Kafka service

use host.docker.internal as hostname. e.g:

"default_bootstrap_server": ["host.docker.internal:9095", "host.docker.internal:9096"]

it works when:

kafka is running directly in local machine

or

kafka is running in other container but exposed to local machine.

Last updated

Was this helpful?