.Net(c#)使用 Kafka 小结
1.开篇
由于项目中必须使用 kafka 来作为消息组件,所以使用 kafka 有一段时间了。不得不感叹 kafka 是一个相当优秀的消息系统。下面直接对使用过程做一总结,希望对大家有用。
1.1.kafka 部署
kafka 的简单搭建我们使用 docker 进行,方便快捷单节点。生产环境不推荐这样的单节点 kafka 部署。
1.1.1.确保安装了 docker 和 docker-compose
网上很多教程,安装也简单,不作为重点赘述。
1.1.2.编写 docker-compose.yml
将以下内容直接复制到新建空文件
docker-compose.yml
中。
version: "3"services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkadepends_on: [zookeeper]ports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_CREATE_TOPICS: "test"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sock
1.1.3.容器构建提交
在
docker-compose.yml
文件的目录下执行以下命令:
docker-compose build # 打包docker-compose up # 启动, 添加 -d 可以后台启动。
看到日志输出:
Creating network "desktop_default" with the default driverCreating desktop_zookeeper_1 ... doneCreating desktop_kafka_1 ... doneAttaching to desktop_zookeeper_1, desktop_kafka_1zookeeper_1 | ZooKeeper JMX enabled by defaultzookeeper_1 | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfgzookeeper_1 | 2020-05-17 03:34:31,794 [myid:] - INFO [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg...zookeeper_1 | 2020-05-17 03:34:31,872 [myid:] - INFO [main:ZooKeeperServer@836] - tickTime set to 2000...kafka_1 | Excluding KAFKA_VERSION from broker config
没有错误输出说明部署成功。
2.kafka 客户端选择
在 github 上能够找到好几个 c#可以使用的 kafka 客户端。大家可以去搜一下,本文就只说明rdkafka-dotnet和confluent-kafka-dotnet。
2.1.rdkafka-dotnet
我们生产环境中就使用的该客户端。在该项目 github 首页中可以看到:
var config = new Config() { GroupId = "example-csharp-consumer" };using (var consumer = new EventConsumer(config, "127.0.0.1:9092")){consumer.OnMessage += (obj, msg) =>{//...};}
没错,使用它的原因就是它提供了EventConsumer,可以直接异步订阅消息。整体上来说该客户端非常的稳定,性能优良。使用过程中比较难缠的就是它的配置,比较不直观。它基于librdkafka(C/C++)实现,配置 Config 类中显式配置比较少,大多数是通过字典配置的,比如:
var config = new Config();config["auto.offset.reset"] = "earliest";//配置首次消息偏移位置为最早
这对于新手来说并不是很友好,很难想到去这样配置。当然如果有 librdkafka 的使用经验会好很多。大多数配置在 librdkafka 项目的CONFIGURATION。
还有一个需要注意的是 Broker 的版本支持Broker version support: >=0.8,也在 librdkafka 项目中可以找到。
2.2 confluent-kafka-dotnet
confluent-kafka-dotnet 是 rdkafka-dotnet(好几年没有维护了)的官方后续版本。推荐使用 confluent-kafka-dotnet,因为配置相对友好,更加全面。比如:
var conf = new ConsumerConfig{AutoOffsetReset = AutoOffsetReset.Earliest//显式强类型赋值配置};
对于 EventConsumer 怎么办呢?在项目变更记录中已经明确提出移除了 OnMessage 多播委托,而 EventConsumer,也就不存在了。但这不难,我们可以参照基项目写一个:
public class EventConsumer<TKey, TValue> : IDisposable{private Task _consumerTask;private CancellationTokenSource _consumerCts;public IConsumer<TKey, TValue> Consumer { get; }public ConsumerBuilder<TKey, TValue> Builder { get; set; }public EventConsumer(IEnumerable<KeyValuePair<string, string>> config){Builder = new ConsumerBuilder<TKey, TValue>(config);Consumer = Builder.Build();}public event EventHandler<ConsumeResult<TKey, TValue>> OnConsumeResult;public event EventHandler<ConsumeException> OnConsumeException;public void Start(){if (Consumer.Subscription?.Any() != true){throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");}if (_consumerTask != null){return;}_consumerCts = new CancellationTokenSource();var ct = _consumerCts.Token;_consumerTask = Task.Factory.StartNew(() =>{while (!ct.IsCancellationRequested){try{var cr = Consumer.Consume(TimeSpan.FromSeconds(1));if (cr == null) continue;OnConsumeResult?.Invoke(this, cr);}catch (ConsumeException e){OnConsumeException?.Invoke(this, e);}}}, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);}public async Task Stop(){if (_consumerCts == null || _consumerTask == null) return;_consumerCts.Cancel();try{await _consumerTask;}finally{_consumerTask = null;_consumerCts = null;}}public void Dispose(){if (_consumerTask != null){Stop().Wait();}Consumer?.Dispose();}}
使用测试:
static async Task Main(string[] args){Console.WriteLine("Hello World!");var conf = new ConsumerConfig{GroupId = "test-consumer-group",BootstrapServers = "localhost:9092",AutoOffsetReset = AutoOffsetReset.Earliest,};var eventConsumer = new EventConsumer<Ignore, string>(conf);eventConsumer.Consumer.Subscribe(new[] {"test"});eventConsumer.OnConsumeResult += (sen, cr) =>{Console.WriteLine($"Receive \'{cr.Message.Value}\' from \'{cr.TopicPartitionOffset}\'");};do{var line = Console.ReadLine();switch (line){case "stop":eventConsumer.Stop();break;case "start":eventConsumer.Start();break;}} while (true);}
3.功能扩展
!!!以下讨论都是对confluent-kafka-dotnet。
由于用户终端也使用了 kafka 客户端订阅消息。如果终端长时间没有上线,并且消息过期时间也较长,服务端会存有大量消息。终端一上线就会读取到大量的堆积消息,很容易就把内存耗尽了。考虑到客户端不是长期在线的场景,无需不间断的处理所有消息,服务端才适合这个角色(:。所以客户端只需每次从登录时的最新点开始读取就可以了,历史性统计就交给服务器去做。
最便捷的方法是每次客户端连接都使用新的groupid,用时间或者guid撒盐。但这样会使服务端记录大量的group信息(如果终端很多m个,并且终端断开连接重连的次数也会很多随机n次,那么也是m*n个group信息),势必对服务端性能造成影响。
另一种方法是在保持groupid不变的情况下,修改消费偏移。那如何去设置位置偏移为最新点呢?
3.1 错误思路 AutoOffsetReset
在配置中存在一个让新手容易产生误解的配置项AutoOffsetReset.Latest自动偏移到最新位置。当你兴冲冲的准备大干一番时发现只有首次创建GroupId时会起作用,当 groupid 已经存在 kafka 记录中时它就不管用了。
3.2 提交偏移 Commit
我们能够在
IConsumer<TKey, TValue>
中找到该 commit 方法,它有三个重载:
1. 无参函数。就是提交当前客户端`IConsumer<TKey, TValue>.Assignment`记录的偏移。2. 参数ConsumeResult<TKey, TValue>。一次仅提交一个偏移。当然配置中默认设置为自动提交(`conf.EnableAutoCommit = true;`),无需手动提交。3. 参数IEnumerable<TopicPartitionOffset> offsets。直接提交到某一个位置。TopicPartitionOffset有三个决定性属性:话题topic、分区:partition、偏移offset。
第三个函数就是我们想要的,我们只需得到对应参数TopicPartitionOffset的值就可以。
3.2.1.TopicPartition的获取
topic 是我们唯一可以确定的。在
IConsumer<TKey, TValue>.Assignment
中可以得到 topic 和 partition。但遗憾的是它只有不会立即有值。我们只能主动去服务端获取,在
IAdminClient
中找到了可获取该信息的方法,所以我们做一扩展:
public static IEnumerable<TopicPartition> GetTopicPartitions(ConsumerConfig config, string topic, TimeSpan timeout){using var adv = new AdminClientBuilder(config).Build();var topPns = adv.GetTopicPartition(topic, timeout);return topPns;}public static IEnumerable<TopicPartition> GetTopicPartition(this AdminClient client, string topic, TimeSpan timeout){var mta = client.GetMetadata(timeout);var topicPartitions = mta.Topics.Where(t => topic == t.Topic).SelectMany(t => t.Partitions.Select(tt => new TopicPartition(t.Topic, tt.PartitionId))).ToList();return topicPartitions;}
3.2.2. TopicPartitionOffset获取
我们还差 offset 的值,通过
IConsumer<TKey, TValue>.QueryWatermarkOffsets
方法可以查到当前水位,而其中 High 水位就是最新偏移。
现在我们可以完成我们的任务了吗?问题再次出现,虽然客户端表现得从最新点消费了,但是在此之前的卡顿和类似与内存溢出让人不得心安。Commit 还是消费了所有消息:(,只不过暗搓搓的进行。在所有消息消费期间读取所有未消费,然后拼命提交。客户端哪有这么大的内存和性能呢。最终,找到一个和第三个 commit 方法一样接受参数的方法
Assign
,一试果然灵验。
public static void AssignOffsetToHighWatermark<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TopicPartition partition, TimeSpan timeout){var water = consumer.QueryWatermarkOffsets(partition, timeout);if (water == null || water.High == 0) return;var offset = new TopicPartitionOffset(partition.Topic, partition.Partition, water.High);consumer.Assign(offset);}
3.2.3.实际使用
最终的使用示例:
//...var topicPartitions = ConsumerEx.GetTopicPartitions(conf, "test", TimeSpan.FromSeconds(5));topicPartitions?.ToList().ForEach(t =>{eventConsumer.Consumer.AssignOffsetToHighWatermark(t, TimeSpan.FromSeconds(5));});eventConsumer.Start();//在消费事件开始之前就可以进行偏移设置//...
请注意,如果您关闭了自动提交功能,并且不主动提交任何偏移信息,那么服务端对该 group 的偏移记录将一直不变,Assign 函数并不会改变任何服务的偏移记录。
4.总结
这一圈下来整个 kafka 的基本消费流程也就搞清楚了。kafka 消费者需要对消费的消息进行提交。事实上,每个消息体里都有偏移信息。不提交对于服务端来说就是客户端没有处理过该消息,将不会更改已消费偏移。以此来保证消息消费的可靠性。这和 tcp 中三次握手有异曲同工之妙。
服务端保存着每一个 groupid 对应的已经提交偏移
Committed Offset
。当然客户端不提交它是不会变更的(不考虑直接操作服务端的形式)。
客户端保存自己的当前偏移
Current Offset
,可以通过
Assign
和
Commit
进行更改,二者区别是
Commit
将连同提交到服务端对应的偏移中进行更改,而
Assign
仅改变客户端偏移,这一更改记录在
IConsumer<TKey, TValue>.Assignment
中,首次启动时客户端异步向服务端请求
Committed Offset
来对其赋值。这就是在 3.2 节中我们没有立即得到该值的的原因,该值将在可能在几秒中后被赋值,所以写了一个主动获取的方法
GetTopicPartition
。客户端下一次消费将根据
IConsumer<TKey, TValue>.Assignment
进行。
使用
AdminClientBuilder.GetMetadata
函数可以得到对应话题的元数据,包括:topic、partition、Brokers 等。
使用
IConsumer<TKey, TValue>.QueryWatermarkOffsets
函数可以得到当前服务端的水位,low 为最早的偏移(可能不是 0,考虑消息过期被删除的情况),high 为最新的偏移。