相对Producer来说,Consumer使用和设计类似,但更为复杂。因此将Consumer相关知识总结一番。顾名思义,consumer就是读取kafka集群中某些topic消息的应用程序。consumer有两个版本,老版本用Scala语言编写,其api包名为kafka.consumer.*, 分别提供high-level和low-level两种API,其缺点是用户必须自行实现错误处理和故障转移等功能,必须依赖zk记录消费的offset;新版本用Java语言编写,其api包名为org.apache.kafka.clients.consumer.*。本文主要讨论新版consumer的特性,同时对老版本相关特性进行一定讨论。首先,先贴出一段简单的kafka consumer代码。
public static void main(String[] args){ String topicName = "test-topic"; String groupId = "test-group"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //必须指定 props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList(topicName)); try { while (true) { ConsumerRecords records = consumer.poll(1000); for(ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),record.value()); } } } finally { consumer.close(); } }
一、消费者组
新版消费者有消费者组的概念,根据kafka官网的定义,若干个个消费者使用一个消费者组名(group.id)进行标记,topic的每条消息支付发送到每个订阅它的消费者组的一个消费者实例上。这句话展现了消费者组3个特点:
- 一个consumer group 包含一个到多个consumer实例
- topic的每条消息只能被发送到同一个group下的一个消费者实例上
- topic的消息可以发送到多个group中
consumer group会将从topic中消费的数据公平的分配到组内的每一个consumer上,且具有实现高伸缩性,高容错性的consumer机制,一旦某个consumer挂掉,consumer group会立即将已崩溃的consumer负责的分区转交给其它consumer负责,从而保证group继续正常工作,不会丢失数据——这个过程就是consumer group的rebalance机制。
二、消息订阅原理
consumer group订阅topic列表很简单,下面两个语句都可以实现:
consumer.subscribe(Arrays.asList("topic-1", "topic-2"));//基于正则表达式订阅//如果用户不是自动提交offset,则需要实现ConsumerRebalanceListener指定分区分配方案变更时的逻辑//自动提交则使用下面默认无操实现即可consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());
为了同时消费多个topic多个分区的消息,旧版本会为每个topic分区创建一个专门的线程去消费,新版本则采用类似于Linux I/O模型的poll或select机制,使用一个线程同时管理多个Socket连接。新版本Consumer从实现上来说是一个双线程的Java进程——用户主线程和后台心跳线程,且consumer不是线程安全的,因此每个consumer实例需要运行在专属线程中,以及显式的同步锁保护。最后要关闭consumer以清除各种Socket资源,并通知coordinator主动离组从而更快地开启新一轮rebalance。poll方法具有以下特性:
- poll方法会去订阅的分区上获取一组消息,只有在获取了足够多的可用数据,或等待时间大于超时时间才会返回。设置超时时间是为了让consumer有机会"醒来"去做一些其它事情,比如定时写日志,如果没有这方面的需求可将超时时间设置为Long.MAX_VALUE。
- 首次调用poll方法,消费者组会根据设置的位移策略设定消费者组的位移,一旦consumer开始提交位移,每个后续的rebalance完成后都会将位置设置成上次已提交的位移。
- 使用第二种消费方式,需要在另一个线程中调用consumer.wakeup()方法中断poll方法,wakeup()方法是consumer中唯一个可以在多线程中使用的方法。poll方法不会立即响应wakeup方法,并在下次poll调用时会抛出WakeupException
//定时返回的消费方式ConsumerRecordsrecords = consumer.poll(1000); //获取足够数据返回的消费方式try{ ConsumerRecords records = consumer.poll(Long.MAX_VALUE);} catch (WakeupException e) { //处理异常}
三、位移管理
- 位移提交
consumer每次消费数据需要知道最新消费消息的位置,该位置称为位移(offset)。consumer需要定期向Kafka提交自己的位置信息,根据提交的策略不同会有3种不同的消息交付语义保证:
- 最多一次:consumer在消息消费成功前就提交位移,如consumer重启会从最新的offset消费,消费失败的消息就丢失了。
- 最少一次:consumer在消息消费 成功后提交位移,但如消费成功后提交失败,会导致重新消费已消费过的消息。
- 精确一次:Kafka社区0.11.0.0支持事务以及精确一次处理的语义。详细原理可参考Kafka 0.11.0.0 是如何实现 Exactly-once 语义的
消息的位置信息包括消费者组中每个消费者上次提交位移、当前位置、水位等信息。老版consumer位置信息由zookeeper保存,新版consumer则是增加__consumeroffsets topic,将offset信息写入这个topic,摆脱存储位移对zookeeper的依赖。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息,并且配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的其存储结构如下图。
如前所述,位移的提交策略对与提供消息交付语义至关重要。新版consumer默认情况下自动提交,间隔5秒(老版60秒),其缺点是不能细粒度的提交位移,特别是有较强的精确一次处理语义时。使用手动提交需要设置enable.auto.commit=false,并调用commitSync或commitAsync方法。下面是一段典型的手动按照分区级别进行位移提交的示例代码:
try { while(running) { ConsumerRecordsrecords = consumer.poll(1000); for(TopicPartition partition :records.partitions()) { List > partitionRecords = records.records(partition); for(ConsumerRecord record : partitionRecords) { //消息处理 } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } }} finally { consumer.close();}
- 设置消费位移
新版consumer提供了三个设置消费API,分别为:
这里要注意的是,在seek前要使用subscribe()订阅topic或使用assign()分配topic的指定分区,否则就会出现 "java.lang.IllegalStateException: No current assignment for partition xxxxxx "的报错。另外由于subscribe()和assign()是“lazy”的,在seek前需要先"虚调用(dummy call)"poll()。对于subscribe()来说,下面的代码是一种更为有效的方式。
/** 开始消费 **/ Collectiontopics = Arrays.asList(topic); consumer.subscribe(topics, new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection collection) { } @Override public void onPartitionsAssigned(Collection collection) { consumer.seekToEnd(collection); } });
四、重平衡(rebalance)
consumer group 触发rebalance的条件有3个:
- 组成员发生变化:consumer加入或离开
- 组订阅topic数发生变化:如基于正则表达式订阅topic
- 组订阅topic的分区数发生变化:使用命令行脚本改变分区数
鉴于目前一次rebalance操作开销比较大,生产环境中要避免不必要的rebalance出现,一定要结合自身业务特点调优request.timeout.ms、max.poll.records、max.pll.interval.ms等参数。
新版本consumer提供了3种分区分配策略:range策略、round-robin策略和sticky策略。range策略将单个topic所有分区按照顺序排列,然后根据consumer个数,将其划分成固定大小的分区段后依次分配给每个consumer;round-robin则把所有topic的所有分区按顺序排列,然后轮询式的分配给每个consumer,默认分配策略是range。
为了隔离每次rebalance上的数据,新版consumer设计了rebalance generation用于标识某次rebalance,主要目的是为了保护consumer group,防止老的generation提交无效的offset。
新版Kafakrebalance流程是通过consumer与coordinator之间的一系列"交流"完成的,它提供了5个处理rebalance相关协议:
- JoinGroup请求:consumer向coordinator请求加入组
- SyncGroup请求:group leader把分配方案同步更新到组内所有成员
- HeartBeat请求:consumer定期向coordinator汇报存活心跳
- LeaveGroup请求:consumer通知coordinator即将离组
- DescribeGroup请求:查看租的所有消息,包括成员信息、协议信息、分配方案以及订阅信息,该协议不触发rebalance
rebalance流程主要分成两步:加入组和同步更新方案:
- 加入组:group组内所有consumer向coordinator发送JoinGroup请求。当收集到所有请求后,coordinator从中选择一个consumer作为group的leader,并把所有成员信息和它们的订阅信息发给leader。注意leader最终负责定制分区分配方案,而coordinator只是负责协调和同步。
- 同步更新分配方案:leader会将定制好的分配方案封装到SyncGroup请求中并发送给coordinator,另外所有consumer都会发送SyncGroup请求给coordinator,而只有leader中的请求包含了分配方案。coordinator接收到分配方案后会把属于每个consumer的方案单独抽取出来作为SyncGroup请求的response返还给各自的consumer。
转载自:https://blog.csdn.net/soaring0121/article/details/81330266