producer 和 consumer 开发实践

前言

消息模型可以分为两种, 队列和发布-订阅式。 队列的处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。Kafka 为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在 Topic 上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了 queue 模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。

我们可通过测试来实际验证下,为此通过网易云控制台创建了一个 kafka 实例 [test],并在此实例下新建了一个 topic [cubegun],如下:

红框内即为 kafka broker 地址

topic 分区数为3,实际应用场景分区数设置很重要,后面会有提到,这里仅为了测试方便。

选择客户端

kafka 客户端支持多语言,具体参看链接

我们这里选择在 Ubuntu 16.04 系统环境上使用 c++ 客户端 (High level API)来做测试。

安装相关依赖

  1. 安装 librdkafka

git clone https://github.com/edenhill/librdkafka.git

具体可以参看安装文档

2) 安装 cppkafa

git clone https://github.com/mfontanini/cppkafka.git

具体可以参看安装文档

编写 producer 和 consumer

我们希望可以测试到两种消息模式(队列和订阅),测试代码下载地址

https://github.com/zengyuxing007/kafka_test_cpp

producer 代码如下:

说明:

  • 运行时可通过参数指定partition 设定producer 产生的消息输入到特定的分区。
  • sleep(1),每一秒产生一个消息输入到 kafka
  • c-m19448y69w.kafka.cn-east-1.internal:9092 为 kafka broker 地址,可通过控制台内“详细信息”找到
#include <cppkafka/producer.h>
#include <iostream>

using namespace std;
using namespace cppkafka;

int main(int argc,char** argv) {

    int p(0);

    if(argc > 1 && argv[1])
    {   
      p= strtol(argv[1],NULL,10);
    }   

    cout << "partition:"<<p<<endl;

    // Create the config
    Configuration config = {
        { "metadata.broker.list", "c-m19448y69w.kafka.cn-east-1.internal:9092" }
    };  

    // Create the producer
    Producer producer(config);

    // Produce a message!
    string message = "hey there!";
    MessageBuilder mesg("cubegun");
    mesg.partition(p).payload(message);

    while(1)
    {   
        producer.produce(mesg);
        producer.flush();
        sleep(1);
    }   

}

consumer 代码如下:

说明:

  • 运行时可通过参数指定 groupId 设定 consumer 对应的消费组
  • enable.auto.commit,默认设置为 true, 特殊场景,可以通过设置该参数为 false, 手动控制 offset 的提交
  • auto.offset.reset,offset 没有特别设定,默认采用 latest

详细的参数说明以及更多相关 config 参数,可以查看Kafka Configuration

关于consumer group

consumer group 是个逻辑的概念,是 Kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 Consumer Group 。topic 的消息会复制(不是真的复制,是概念上的)到所有的 Consumer Group ,但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 consumer(不支持并发)。如果需要实现广播,只要每个 consumer 有一个独立的 Consumer Group 就可以了。要实现单播只要所有的 consumer 在同一个 Consumer Group 。用 Consumer Group 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic

关于 consumer group 及相关名词概念具体可以查看Kafka 名词解释

#include <iostream>
#include <string>
#include <cppkafka/consumer.h>

using namespace std;
using namespace cppkafka;

int main(int argc,char** argv)
{

        string groupId("foo");

        if(argc >1 && argv[1])
        {
                groupId = argv[1];
        }

        Configuration config = {
                { "metadata.broker.list", "c-m19448y69w.kafka.cn-east-1.internal:9092" },
                {"group.id",groupId}
        };

        // Construct from some config we've defined somewhere
        Consumer consumer(config);

        // Set the assignment callback
        consumer.set_assignment_callback([&](vector<TopicPartition>& topic_partitions) {
                                        // Here you could fetch offsets and do something, altering the offsets on the
                                        // topic_partitions vector if needed
                                        cout << "Got assigned " << topic_partitions.size() << " partitions!" << endl;
                                        });

        // Set the revocation callback
        consumer.set_revocation_callback([&](const vector<TopicPartition>& topic_partitions) {
                                        cout << topic_partitions.size() << " partitions revoked!" << endl;
                                        });


        // Subscribe to 1 topics
        consumer.subscribe({ "cubegun" });

        // Now loop forever polling for messages
        while (true) {
                Message msg = consumer.poll();

                // Make sure we have a message before processing it
                if (!msg) {
                        continue;
                }

                // Messages can contain error notifications rather than actual data
                if (msg.get_error()) {
                        // librdkafka provides an error indicating we've reached the
                        // end of a partition every time we do so. Make sure it's not one
                        // of those cases, as it's not really an error
                        if (!msg.is_eof()) {
                                // Handle this somehow...
                        }
                        continue;
                }

                // We actually have a message. At this point you can check for the
                // message's key and payload via `Message::get_key` and
                // `Message::get_payload`

                cout << "Received message on partition " << msg.get_topic() << "/"
                        << msg.get_partition() << ", offset " << msg.get_offset() << ", content :" << msg.get_payload()<< endl;
        }

}

测试

1: 只启动一个 producer 和 一个 consumer

  1. 启动 producer,指定 partition 为0
./producer 0
  1. 启动 consumer,指定 consumer group 为 foo
./consumer foo

可以看到 consumer 被指定了 3 个 partitions (topic cubegun 一共就 3 个 partitions),因为只有 partition 0 (我们只启动了 producer 0) 有消息产生,这里 consumer 定时读取到的消息只有 partition 0 的,producer 不断在产生消息,consumer polling ,显示offset 一直在递增。

那么,我们增加一个 consumer 会怎么样呢?

2: 增加 consumer,验证新的 consumer 是否可以读取到消息

新开一个终端,输入命令:

./consumer foo

可以看到原有的 consumer 被重新仅分配了一个 partitions, 且获取不到消息了,消息被分配到新开的 consumer 了。这是因为新增consumer 客户端,触发了 rebalance,reblance 后 consumer 对应的 partition 被重新分配了,partition 0 被分配到新开的consumer 客户端了

这里也说明了对应一个 consumer group,Topic 上消息仅被分发给此消费者组中的一个消费者,如果 consumer 比 partition 多,是浪费。

3: 增加 consumer, 切换到新的 consumer group

新开一个终端,输入命令:

./consumer bar

可以看到新的 consumer 却同样可以获得 message, 这个是因为针对不同的 consumer group,kafka broker 是广播消息的。

以上测试都是一个 producer, 我们来测试下多个 producer 的情况(每个 partition 都有对应的 producer 产生消息)

4: 关掉 consumer bar, 依次启动 producer 1,2 并新开一个 consumer 指定 group id 为 foo

即总体:

  • 启动 producer,分别对应 partition [0,1,2]
  • 启动 consumer foo 3 个

可以看到三个 consumer 分别对应了三个 partition

consumer 1

consumer 2

consumer 3

状态监控

可以通过控制台查看消息队列 topic 下相关信息,分区覆盖率为 100%, 表示每个分区都有对应消费者。

更多测试场景,可以自行通过测试程序来实践下。 也可以使用 kafkacat 工具来验证测试,ubuntu 可以通过 apt install kafkacat 快速安装。

kafkacat 使用方法

consumer

  • -p 参数指定 partition, 也可以不指定。
  kafkacat -C -b c-m19448y69w.kafka.cn-east-1.internal:9092 -t cubegun -p 2 -f 'Topic %t [%p] at offset %o: key %k: %s\n' -o end

producer

  • 可以结合 linux 定时任务 crontab 来定时产生消息。
  echo "Hello,Kafka!" | kafkacat -P -b c-m19448y69w.kafka.cn-east-1.internal:9092 -t cubegun -p 2

查看 kafka metadata

 kafkacat -L -b c-m19448y69w.kafka.cn-east-1.internal:9092

总结

  1. 如果 consumer 比 partition 多,是浪费,因为kafka的设计是在一个 partition 上是不允许并发的,所以 consumer 数不要大于partition 数
  2. 如果 consumer 比 partition 少,一个 consumer 会对应于多个 partitions,这里主要合理分配 consumer 数和 partition 数,否则会导致 partition 里面的数据被取的不均匀。最好 partiton 数目是 consumer 数目的整数倍,所以 partition 数目很重要,比如取24,就很容易设定 consumer 数目
  3. 如果 consumer 从多个 partition 读到数据,不保证数据间的顺序性,kafka 只保证在一个 partition 上数据是有序的,但多个partition,根据你读的顺序会有不同
  4. 增减 consumer,broker,partition 会导致 rebalance,所以 rebalance 后 consumer 对应的 partition 会发生变化
  5. High-level 接口中获取不到数据的时候是会 block 的