📚
Tech-Posts
  • README
  • Kafka + Maxwell
  • Kafka
  • Docker
  • MySQL connection via SSH
  • Python
    • Django
    • PyCharm+Docker Dev
    • Pip Tools
    • python project with local packages
  • PHP
    • PhpStorm+Docker Dev
  • Cassandra
  • AWS
    • Cheat Sheet
    • Lambda with Kinesis Event Source Mapping
  • AWS DMS
  • Lambda demo function to produce to Kinesis
  • Deploy a static web page with protection of specific static resources on AWS S3
  • Data Engineer
    • Move Salesforce Files out using Pentaho DI
  • A Pentaho DI Project Readme
  • PowerBI
    • Power BI refer to previous row
Powered by GitBook
On this page
  • Kafka Features
  • Kafka Multi-Broker Environment Setup
  • install Java
  • install Kafka (included Zookeeper)
  • add server config for multi-broker
  • start Zookeeper
  • start Kafka Server (two brokers)
  • make Kafka/Zookeeper Service
  • Kafka CLI
  • PHP + Kafka
  • some other codes
  • error handling
  • Kafka in Docker
  • kafka data persistent issues
  • another container access host machine Kafka service

Was this helpful?

Kafka

Kafka Features

not native support delay message

Kafka Multi-Broker Environment Setup

This is for single server multi-brokers. multi-server multi-brokers are very similar, only difference is to setup kafka server config and start in different servers, and no need to use different ports.

install Java

install Kafka (included Zookeeper)

cd /data

wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz

tar -xzf kafka_2.12-2.5.0.tgz

cd kafka_2.12-2.5.0

add server config for multi-broker

Each broker has its config file.

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

Edit each config file.

broker.id=1
listeners=PLAINTEXT://<hostname>:9092
log.dirs=/tmp/kafka-logs-1

auto.create.topics.enable=true
broker.id=2
listeners=PLAINTEXT://<hostname>:9093
log.dirs=/tmp/kafka-logs-2

auto.create.topics.enable=true

broker.id should be different.

if it's in single server, listeners port should be different.

start Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

start Kafka Server (two brokers)

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

make Kafka/Zookeeper Service

make zookeeper service:

sudo vim /etc/systemd/system/zookeeper.service
zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=root
ExecStart=/data/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /data/kafka_2.12-2.5.0/config/zookeeper.properties
ExecStop=/data/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh
SuccessExitStatus=143
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

zookeeper-server-stop.sh exit with code 143, if no SuccessExitStatus=143 service would regard it as a failure.

make two kafka broker services:

sudo vim /etc/systemd/system/kafka1.service
kafka1.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=root
ExecStart=/data/kafka_2.12-2.5.0/bin/kafka-server-start.sh /data/kafka_2.12-2.5.0/config/server-1.properties
ExecStop=/data/kafka_2.12-2.5.0/bin/kafka-server-stop.sh
SuccessExitStatus=143
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
sudo vim /etc/systemd/system/kafka2.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=root
ExecStart=/data/kafka_2.12-2.5.0/bin/kafka-server-start.sh /data/kafka_2.12-2.5.0/config/server-2.properties
ExecStop=/data/kafka_2.12-2.5.0/bin/kafka-server-stop.sh
SuccessExitStatus=143
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

two services' only difference is ExecStart, use different server config (two broker server config)

Unit's Requires/After of zookeeper.service make sure kafka service will check if zookeeper service running, if not, automatically start it first.

SuccessExitStatus=143, reason is same as zookeeper service.

After all, you can run two broker services via:

sudo systemctl start kafka1
sudo systemctl start kafka2

kafka service already includes auto start of zookeeper.

Kafka CLI

Create a topic:

./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic my-topic

List all topics:

./kafka-topics.sh --list --zookeeper zookeeper:2181

run a producer console, then we can type some messages and produce to kafka.

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

Consume:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning

PHP + Kafka

  • install and enable php rdkafka extention.

config and produce code example:

        $conf = new Conf();
        $conf->set('log_level', (string) LOG_ERR);
        $conf->set('debug', 'all');
        $conf->setLogCb(function($kafka, $level, $fac, $buf) {
            Log::debug("Kafka|level=$level|keyword=$fac|message=$buf");
        });

        $brokerServers = config("message_produce.drivers.kafka.broker_servers");
        $producer = new Producer($conf);
        $producer->addBrokers($brokerServers);
        Log::debug("producer added broker servers: ".$brokerServers);
        $this->producer = $producer;
        try {
            $topic = $this->producer->newTopic("test_topic");
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "this is a test topic");
//            $this->producer->poll(3000);
            while ($this->producer->getOutQLen() > 0) {
                $this->producer->poll(1);
            }
        } catch (\Exception $e) {
            Log::critical("kafka produce abnormal: ".$e->getMessage());
            throw new $e;
        }

if not use conf->setLogCb, it will dump all info to the stdoutput. use this setting can DIY how to deal with running info. reference is: https://docs.confluent.io/3.1.1/clients/librdkafka/rdkafka_8h.html#a06ade2ca41f32eb82c6f7e3d4acbe19f

RD_EXPORT void rd_kafka_conf_set_log_cb

(

rd_kafka_conf_t *

conf,

void(*)(const rd_kafka_t *rk, int level, const char *fac, const char *buf)

log_cb

)

Set logger callback.

The default is to print to stderr, but a syslog logger is also available, see rd_kafka_log_print and rd_kafka_log_syslog for the builtin alternatives. Alternatively the application may provide its own logger callback. Or pass func as NULL to disable logging.

This is the configuration alternative to the deprecated rd_kafka_set_logger()

some other codes

yeah, this equals to

$conf = new RdKafka\Conf();
$topicConf = new RdKafka\TopicConf();
$topicConf->set('message.timeout.ms', '5000');
$conf->setDefaultTopicConf($topicConf);

You can read my findings regarding how error handling works in php-rdkafka here: php-enqueue/enqueue-dev#749 Specifically:

$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
$start = microtime(true);
while ($this->producer->getOutQLen() > 0) {
     $this->producer->poll(0);

     if (microtime(true) - $start > 10) {
           throw new \RuntimeException("Message sending failed");
     }
}

error handling

can use setErrorCb to do error handling:

$conf->setErrorCb(function ($kafka, $err, $reason) {
    $error = sprintf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
    throw new CrmError($error);
});

Kafka in Docker

here is a typical setting in docker-compose.yml

docker-compose.yml
services:
    zookeeper:
        image: bitnami/zookeeper:latest
        hostname: zookeeper
        privileged: true
        environment:
            ALLOW_ANONYMOUS_LOGIN: "yes"
            ZOO_AUTOPURGE_INTERVAL: 1
            ZOO_AUTOPURGE_RETAIN_COUNT: 3
        volumes:
            - ./docker-data/zookeeper/data:/bitnami/zookeeper
#            - ./docker-config/zookeeper/zoo.cfg:/opt/bitnami/zookeeper/conf/zoo.cfg
        ports:
            - "2181:2181"
        networks:
            - api_crm


    kafka-1:
        image: wurstmeister/kafka
        ports:
            - "9095:9092"
        environment:
            KAFKA_ADVERTISED_HOST_NAME:  192.168.0.200
            KAFKA_ADVERTISED_PORT: 9095
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_LOG_DIRS: /kafka/logs
            KAFKA_BROKER_ID: 500
            KAFKA_offsets_topic_replication_factor: 2
        volumes:
            - ./docker-data/kafka/var/run/docker.sock:/var/run/docker.sock
            - ./docker-data/kafka/broker_500:/kafka
        networks:
            - api_crm
        depends_on:
            - zookeeper

    kafka-2:
        image: wurstmeister/kafka
        ports:
            - "9096:9092"
        environment:
            KAFKA_ADVERTISED_HOST_NAME:  192.168.0.200
            KAFKA_ADVERTISED_PORT: 9096
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_LOG_DIRS: /kafka/logs
            KAFKA_BROKER_ID: 501
            KAFKA_offsets_topic_replication_factor: 2
        volumes:
            - ./docker-data/kafka/var/run/docker.sock:/var/run/docker.sock
            - ./docker-data/kafka/broker_501:/kafka
        networks:
            - api_crm
        depends_on:
            - zookeeper

kafka data persistent issues

NEED to mount /kafka and /var/run/docker.sock out of container use volumes.

KAFKA_LOG_DIR NEED to be set, or every time docker restart, kafka will create a new log dir with hashed suffix and use it, which means loss of old data. (old data physically doesn't disappeared, but in another universe - another log directory ).

But when docker restart, there will be a fatal error about Cluster ID not match:

ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID m1Ze6AjGRwqarkcxJscgyQ doesn't match stored clusterId Some(1TGYcbFuRXa4Lqojs4B9Hw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:220) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala) [2020-01-04 15:58:43,303] INFO shutting down (kafka.server.KafkaServer)

it related to a Kafka mechanism:

introduced in the Kafka 2.4.0 release and it is [KAFKA-7335] - Store clusterId locally to ensure broker joins the right clusterWhen the docker restart happens, Kafka tries to match the locally stored clusterId ( in logs/meta.properties ) to the Zookeeper's clusterId(which changed because of docker restart) due to this mismatch, the above error is thrown. Please refer to this link for more information.

There are some approaches to fix the problem:

  • persistent zookeeper data by edit docker-compose.yml zookeeper service:

volumes:
  - ./docker-data/zookeeper/data:/bitnami/zookeeper

but every time restart, there will be zookeeper log file generated. not small

  • delete or modify meta.properties in kafka logs. local cluster id is in it.

it will lost old data too even data is there.

another container access host machine Kafka service

use host.docker.internal as hostname. e.g:

"default_bootstrap_server": ["host.docker.internal:9095", "host.docker.internal:9096"]

it works when:

kafka is running directly in local machine

or

kafka is running in other container but exposed to local machine.

$ docker-compose ps
          Name                         Command               State                          Ports
-------------------------------------------------------------------------------------------------------------------------
api_logix_crm_kafka-1_1     start-kafka.sh                   Up      0.0.0.0:9095->9092/tcp
api_logix_crm_kafka-2_1     start-kafka.sh                   Up      0.0.0.0:9096->9092/tcp

PreviousKafka + MaxwellNextDocker

Last updated 5 years ago

Was this helpful?

[kafka] RdKafkaProducer has no way of error handling · Issue #749 · php-enqueue/enqueue-devGitHub
Logo