Kafka
Kafka Features
not native support delay message
Kafka Multi-Broker Environment Setup
This is for single server multi-brokers. multi-server multi-brokers are very similar, only difference is to setup kafka server config and start in different servers, and no need to use different ports.
install Java
install Kafka (included Zookeeper)
cd /data
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0
add server config for multi-broker
Each broker has its config file.
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
Edit each config file.
broker.id=1
listeners=PLAINTEXT://<hostname>:9092
log.dirs=/tmp/kafka-logs-1
auto.create.topics.enable=true
start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
start Kafka Server (two brokers)
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
make Kafka/Zookeeper Service
make zookeeper service:
sudo vim /etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=root
ExecStart=/data/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /data/kafka_2.12-2.5.0/config/zookeeper.properties
ExecStop=/data/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh
SuccessExitStatus=143
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
make two kafka broker services:
sudo vim /etc/systemd/system/kafka1.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=root
ExecStart=/data/kafka_2.12-2.5.0/bin/kafka-server-start.sh /data/kafka_2.12-2.5.0/config/server-1.properties
ExecStop=/data/kafka_2.12-2.5.0/bin/kafka-server-stop.sh
SuccessExitStatus=143
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
sudo vim /etc/systemd/system/kafka2.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=root
ExecStart=/data/kafka_2.12-2.5.0/bin/kafka-server-start.sh /data/kafka_2.12-2.5.0/config/server-2.properties
ExecStop=/data/kafka_2.12-2.5.0/bin/kafka-server-stop.sh
SuccessExitStatus=143
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
After all, you can run two broker services via:
sudo systemctl start kafka1
sudo systemctl start kafka2
Kafka CLI
Create a topic:
./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic my-topic
List all topics:
./kafka-topics.sh --list --zookeeper zookeeper:2181
run a producer console, then we can type some messages and produce to kafka.
./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
Consume:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
PHP + Kafka
install and enable php rdkafka extention.
config and produce code example:
$conf = new Conf();
$conf->set('log_level', (string) LOG_ERR);
$conf->set('debug', 'all');
$conf->setLogCb(function($kafka, $level, $fac, $buf) {
Log::debug("Kafka|level=$level|keyword=$fac|message=$buf");
});
$brokerServers = config("message_produce.drivers.kafka.broker_servers");
$producer = new Producer($conf);
$producer->addBrokers($brokerServers);
Log::debug("producer added broker servers: ".$brokerServers);
$this->producer = $producer;
try {
$topic = $this->producer->newTopic("test_topic");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "this is a test topic");
// $this->producer->poll(3000);
while ($this->producer->getOutQLen() > 0) {
$this->producer->poll(1);
}
} catch (\Exception $e) {
Log::critical("kafka produce abnormal: ".$e->getMessage());
throw new $e;
}
RD_EXPORT void rd_kafka_conf_set_log_cb
(
rd_kafka_conf_t *
conf,
void(*)(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
log_cb
)
some other codes
yeah, this equals to
$conf = new RdKafka\Conf();
$topicConf = new RdKafka\TopicConf();
$topicConf->set('message.timeout.ms', '5000');
$conf->setDefaultTopicConf($topicConf);
You can read my findings regarding how error handling works in php-rdkafka here: php-enqueue/enqueue-dev#749 Specifically:
$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
$start = microtime(true);
while ($this->producer->getOutQLen() > 0) {
$this->producer->poll(0);
if (microtime(true) - $start > 10) {
throw new \RuntimeException("Message sending failed");
}
}
error handling
can use setErrorCb
to do error handling:
$conf->setErrorCb(function ($kafka, $err, $reason) {
$error = sprintf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
throw new CrmError($error);
});
Kafka in Docker
here is a typical setting in docker-compose.yml
services:
zookeeper:
image: bitnami/zookeeper:latest
hostname: zookeeper
privileged: true
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_AUTOPURGE_INTERVAL: 1
ZOO_AUTOPURGE_RETAIN_COUNT: 3
volumes:
- ./docker-data/zookeeper/data:/bitnami/zookeeper
# - ./docker-config/zookeeper/zoo.cfg:/opt/bitnami/zookeeper/conf/zoo.cfg
ports:
- "2181:2181"
networks:
- api_crm
kafka-1:
image: wurstmeister/kafka
ports:
- "9095:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.0.200
KAFKA_ADVERTISED_PORT: 9095
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LOG_DIRS: /kafka/logs
KAFKA_BROKER_ID: 500
KAFKA_offsets_topic_replication_factor: 2
volumes:
- ./docker-data/kafka/var/run/docker.sock:/var/run/docker.sock
- ./docker-data/kafka/broker_500:/kafka
networks:
- api_crm
depends_on:
- zookeeper
kafka-2:
image: wurstmeister/kafka
ports:
- "9096:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.0.200
KAFKA_ADVERTISED_PORT: 9096
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LOG_DIRS: /kafka/logs
KAFKA_BROKER_ID: 501
KAFKA_offsets_topic_replication_factor: 2
volumes:
- ./docker-data/kafka/var/run/docker.sock:/var/run/docker.sock
- ./docker-data/kafka/broker_501:/kafka
networks:
- api_crm
depends_on:
- zookeeper
kafka data persistent issues
NEED to mount /kafka and /var/run/docker.sock out of container use volumes.
KAFKA_LOG_DIR
NEED to be set, or every time docker restart, kafka will create a new log dir with hashed suffix and use it, which means loss of old data. (old data physically doesn't disappeared, but in another universe - another log directory ).
But when docker restart, there will be a fatal error about Cluster ID not match:
ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID m1Ze6AjGRwqarkcxJscgyQ doesn't match stored clusterId Some(1TGYcbFuRXa4Lqojs4B9Hw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:220) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala) [2020-01-04 15:58:43,303] INFO shutting down (kafka.server.KafkaServer)
it related to a Kafka mechanism:
introduced in the Kafka 2.4.0 release and it is [KAFKA-7335] - Store clusterId locally to ensure broker joins the right cluster
When the docker restart happens, Kafka tries to match the locally stored clusterId
( in logs/meta.properties ) to the Zookeeper's clusterId
(which changed because of docker restart) due to this mismatch, the above error is thrown. Please refer to this link for more information.
There are some approaches to fix the problem:
persistent zookeeper data by edit
docker-compose.yml
zookeeper service:
volumes:
- ./docker-data/zookeeper/data:/bitnami/zookeeper
but every time restart, there will be zookeeper log file generated. not small
delete or modifymeta.propertiesin kafka logs. local cluster id is in it.
it will lost old data too even data is there.
another container access host machine Kafka service
use host.docker.internal
as hostname. e.g:
"default_bootstrap_server": ["host.docker.internal:9095", "host.docker.internal:9096"]
it works when:
kafka is running directly in local machine
or
kafka is running in other container but exposed to local machine.
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------------------------------------------------
api_logix_crm_kafka-1_1 start-kafka.sh Up 0.0.0.0:9095->9092/tcp
api_logix_crm_kafka-2_1 start-kafka.sh Up 0.0.0.0:9096->9092/tcp
Last updated
Was this helpful?