博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
新版本 Kafka Consumer 的设计原理(转)
阅读量:4958 次
发布时间:2019-06-12

本文共 6584 字,大约阅读时间需要 21 分钟。

  相对Producer来说,Consumer使用和设计类似,但更为复杂。因此将Consumer相关知识总结一番。顾名思义,consumer就是读取kafka集群中某些topic消息的应用程序。consumer有两个版本,老版本用Scala语言编写,其api包名为kafka.consumer.*, 分别提供high-levellow-level两种API,其缺点是用户必须自行实现错误处理和故障转移等功能,必须依赖zk记录消费的offset;新版本用Java语言编写,其api包名为org.apache.kafka.clients.consumer.*。本文主要讨论新版consumer的特性,同时对老版本相关特性进行一定讨论。首先,先贴出一段简单的kafka consumer代码。

 

public static void main(String[] args){        String topicName = "test-topic";        String groupId = "test-group";         Properties props = new Properties();        props.put("bootstrap.servers", "localhost:9092"); //必须指定        props.put("group.id", groupId);        props.put("enable.auto.commit", "true");        props.put("auto.commit.interval.ms", "1000");        props.put("auto.offset.reset", "earliest");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        KafkaConsumer
consumer = new KafkaConsumer
(props); consumer.subscribe(Arrays.asList(topicName)); try { while (true) { ConsumerRecords
records = consumer.poll(1000); for(ConsumerRecord
record : records) { System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),record.value()); } } } finally { consumer.close(); } }

 

一、消费者组

  新版消费者有消费者组的概念,根据kafka官网的定义,若干个个消费者使用一个消费者组名(group.id)进行标记,topic的每条消息支付发送到每个订阅它的消费者组的一个消费者实例上。这句话展现了消费者组3个特点:

  • 一个consumer group 包含一个到多个consumer实例
  • topic的每条消息只能被发送到同一个group下的一个消费者实例上
  • topic的消息可以发送到多个group中

  consumer group会将从topic中消费的数据公平的分配到组内的每一个consumer上,且具有实现高伸缩性,高容错性的consumer机制,一旦某个consumer挂掉,consumer group会立即将已崩溃的consumer负责的分区转交给其它consumer负责,从而保证group继续正常工作,不会丢失数据——这个过程就是consumer group的rebalance机制。

 

二、消息订阅原理

  consumer group订阅topic列表很简单,下面两个语句都可以实现:

consumer.subscribe(Arrays.asList("topic-1", "topic-2"));//基于正则表达式订阅//如果用户不是自动提交offset,则需要实现ConsumerRebalanceListener指定分区分配方案变更时的逻辑//自动提交则使用下面默认无操实现即可consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());

  为了同时消费多个topic多个分区的消息,旧版本会为每个topic分区创建一个专门的线程去消费,新版本则采用类似于Linux I/O模型的poll或select机制,使用一个线程同时管理多个Socket连接。新版本Consumer从实现上来说是一个双线程的Java进程——用户主线程和后台心跳线程,且consumer不是线程安全的,因此每个consumer实例需要运行在专属线程中,以及显式的同步锁保护。最后要关闭consumer以清除各种Socket资源,并通知coordinator主动离组从而更快地开启新一轮rebalance。poll方法具有以下特性:

  1. poll方法会去订阅的分区上获取一组消息,只有在获取了足够多的可用数据,或等待时间大于超时时间才会返回。设置超时时间是为了让consumer有机会"醒来"去做一些其它事情,比如定时写日志,如果没有这方面的需求可将超时时间设置为Long.MAX_VALUE。
  2. 首次调用poll方法,消费者组会根据设置的位移策略设定消费者组的位移,一旦consumer开始提交位移,每个后续的rebalance完成后都会将位置设置成上次已提交的位移。
  3. 使用第二种消费方式,需要在另一个线程中调用consumer.wakeup()方法中断poll方法,wakeup()方法是consumer中唯一个可以在多线程中使用的方法。poll方法不会立即响应wakeup方法,并在下次poll调用时会抛出WakeupException
//定时返回的消费方式ConsumerRecords
records = consumer.poll(1000); //获取足够数据返回的消费方式try{ ConsumerRecords
records = consumer.poll(Long.MAX_VALUE);} catch (WakeupException e) { //处理异常}

 

三、位移管理

  • 位移提交      

       consumer每次消费数据需要知道最新消费消息的位置,该位置称为位移(offset)。consumer需要定期向Kafka提交自己的位置信息,根据提交的策略不同会有3种不同的消息交付语义保证:

  1. 最多一次:consumer在消息消费成功前就提交位移,如consumer重启会从最新的offset消费,消费失败的消息就丢失了。
  2. 最少一次:consumer在消息消费 成功后提交位移,但如消费成功后提交失败,会导致重新消费已消费过的消息。
  3. 精确一次:Kafka社区0.11.0.0支持事务以及精确一次处理的语义。详细原理可参考Kafka 0.11.0.0 是如何实现 Exactly-once 语义的

        消息的位置信息包括消费者组中每个消费者上次提交位移、当前位置、水位等信息。老版consumer位置信息由zookeeper保存,新版consumer则是增加__consumeroffsets topic,将offset信息写入这个topic,摆脱存储位移对zookeeper的依赖。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息,并且配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的其存储结构如下图。

 

 

 

 

 

  如前所述,位移的提交策略对与提供消息交付语义至关重要。新版consumer默认情况下自动提交,间隔5秒(老版60秒),其缺点是不能细粒度的提交位移,特别是有较强的精确一次处理语义时。使用手动提交需要设置enable.auto.commit=false,并调用commitSync或commitAsync方法。下面是一段典型的手动按照分区级别进行位移提交的示例代码:

try {    while(running) {        ConsumerRecords 
records = consumer.poll(1000); for(TopicPartition partition :records.partitions()) { List
> partitionRecords = records.records(partition); for(ConsumerRecord
record : partitionRecords) { //消息处理 } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } }} finally { consumer.close();}

 

  • 设置消费位移

新版consumer提供了三个设置消费API,分别为:

这里要注意的是,在seek前要使用subscribe()订阅topic或使用assign()分配topic的指定分区,否则就会出现 "java.lang.IllegalStateException: No current assignment for partition xxxxxx "的报错。另外由于subscribe()和assign()是“lazy”的,在seek前需要先"虚调用(dummy call)"poll()。对于subscribe()来说,下面的代码是一种更为有效的方式。

/** 开始消费 **/        Collection
topics = Arrays.asList(topic); consumer.subscribe(topics, new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection
collection) { } @Override public void onPartitionsAssigned(Collection
collection) { consumer.seekToEnd(collection); } });

 

四、重平衡(rebalance)

   consumer group 触发rebalance的条件有3个:

  • 组成员发生变化:consumer加入或离开
  • 组订阅topic数发生变化:如基于正则表达式订阅topic
  • 组订阅topic的分区数发生变化:使用命令行脚本改变分区数

      鉴于目前一次rebalance操作开销比较大,生产环境中要避免不必要的rebalance出现,一定要结合自身业务特点调优request.timeout.ms、max.poll.records、max.pll.interval.ms等参数。

      新版本consumer提供了3种分区分配策略:range策略、round-robin策略和sticky策略。range策略将单个topic所有分区按照顺序排列,然后根据consumer个数,将其划分成固定大小的分区段后依次分配给每个consumer;round-robin则把所有topic的所有分区按顺序排列,然后轮询式的分配给每个consumer,默认分配策略是range。

        为了隔离每次rebalance上的数据,新版consumer设计了rebalance generation用于标识某次rebalance,主要目的是为了保护consumer group,防止老的generation提交无效的offset。

        新版Kafakrebalance流程是通过consumer与coordinator之间的一系列"交流"完成的,它提供了5个处理rebalance相关协议:

  • JoinGroup请求:consumer向coordinator请求加入组
  • SyncGroup请求:group leader把分配方案同步更新到组内所有成员
  • HeartBeat请求:consumer定期向coordinator汇报存活心跳
  • LeaveGroup请求:consumer通知coordinator即将离组
  • DescribeGroup请求:查看租的所有消息,包括成员信息、协议信息、分配方案以及订阅信息,该协议不触发rebalance

         rebalance流程主要分成两步:加入组和同步更新方案:

  1. 加入组:group组内所有consumer向coordinator发送JoinGroup请求。当收集到所有请求后,coordinator从中选择一个consumer作为group的leader,并把所有成员信息和它们的订阅信息发给leader。注意leader最终负责定制分区分配方案,而coordinator只是负责协调和同步。
  2. 同步更新分配方案:leader会将定制好的分配方案封装到SyncGroup请求中并发送给coordinator,另外所有consumer都会发送SyncGroup请求给coordinator,而只有leader中的请求包含了分配方案。coordinator接收到分配方案后会把属于每个consumer的方案单独抽取出来作为SyncGroup请求的response返还给各自的consumer。

 

  转载自:https://blog.csdn.net/soaring0121/article/details/81330266

转载于:https://www.cnblogs.com/jackspan/p/11179787.html

你可能感兴趣的文章
virtualbox测试k8s要注意的情况
查看>>
JAVA WEB新进展
查看>>
对象Object合并
查看>>
哇,今天做到一个十分666的题目,最后居然化成了背包,而其中的证明真是太妙了!!!...
查看>>
经典算法学习之动态规划
查看>>
结对第二次-文章摘要热词统计
查看>>
spark安装和登陆配置
查看>>
Linux环境下Django App部署到XAMPP上
查看>>
SRM 594 DIV 2 - 2
查看>>
算法导论7.1-4习题解答(快速排序)
查看>>
[算法] 可并堆
查看>>
变量和算术运算之算术运算(二)
查看>>
Fatal error: Maximum execution time of 30 seconds exceeded in
查看>>
定义两个方法,一个用来求最小公倍数,一个用来求最大公约数
查看>>
php的数组
查看>>
nmap脚本nse的使用
查看>>
20 个值得一试的JavaScript 框架
查看>>
selenium_采集药品数据2_采集所有表格
查看>>
三维网格去噪算法(L0 Minimization)
查看>>
java连接mysql数据库例子代码
查看>>