建网站不做广告怎么赚钱,如何做网站站长,岳阳网站定制开发设计,深圳网站建设大概多少钱文章目录 概述回顾历史老版本获取消费者变更老版本存在的问题 消费者协调器和组协调器新版如何解决老版本问题再均衡过程**第一阶段CFIND COORDINATOR****第二阶段#xff08;JOINGROUP#xff09;**选举消费组的lcader选举分区分配策略 第三阶段#xff08;SYNC GROUP… 文章目录 概述回顾历史老版本获取消费者变更老版本存在的问题 消费者协调器和组协调器新版如何解决老版本问题再均衡过程**第一阶段CFIND COORDINATOR****第二阶段JOINGROUP**选举消费组的lcader选举分区分配策略 第三阶段SYNC GROUP消费组元数据信息 第四阶段HEARTBEAT 消费者协调器和控制器关系分区分配与消费者组管理选举与状态同步心跳检测与故障处理 概述
上一章我们讲解kafka消费端的分区分配策略时留下了两个问题这章我们通过消费者协调器和组协调器继续详细回答那两个问题。其中我们会回归历史说明消费者协调器和组协调器出现的原因最后会说下消费者协调器和控制器在工作上有何关联。
回顾历史
老版本获取消费者变更
消费者协调器和组协调器的概念是针对新版的消费者客户端而言的Kafka建立之初并没有它们。旧版的消费者客户端是使用ZooKeeper的监听器Watcher来实现这些功能的。每个消费组王在ZooKeeper中都维护了一个/consumers//ids路径在此路径下使用临时节点记录录属于此消费组的消费者的唯一标识CconsumerIdString consumerIdString由消费者启动时创建。消费者的唯一标识由aconsumer.id主机名时间截UUID的部分信息构成其中consumerid是旧版消费者客户端中的配置相当于新版客户端中的client.id。
每个broker、主题和分区在ZooKeeper中也都对应一个路径/brokers/ids/记录了 host、port及分配在此broker上的主题分区表/brokers/topics/记录了每个分区的leader副本、ISR集合等信息。/brokers/topics//partitions//state记录了当前leader副本、leaderepoch等信息。如下图
每个消费者在启动时都会在/consumers//ids和/brokers/ids路径上注册一个监听器。当/consumers//ids路径下的子节点发生变化时表示消费组中的消费者发生了变化当/brokers/ids路径下的子节点发生变化时表示broker出现了增减。这样通过ZooKeeper所提供的Watcher每个消费者就可以监听消费组和Kafka集群的状态了。
老版本存在的问题
这种方式下每个消费者对ZooKeeper E的相关路径分别进行监听当触发再均衡操作时一个消费组下的所有消费者会同时进行再均衡操作而消费者之间并不知道彼此操作的结果这样可能导致Kafka工作在一个不正确的状态。与此同时这种严重依赖于ZooKeeper集群的做法还有两个比较严重的问题。
1羊群效应HerdEffect所谓的羊群效应是指ZooKeeper中一个被监听的节点变化大量的Watcher通知被发送到客户端导致在通知期间的其他操作延迟也有可能发生类似死锁的情况。2脑裂问题SplitBrain消费者进行再均衡操作时每个消费者都与ZooKeeper进行通信以判断消费者或broker变化的情况由于ZooKeeper本身的特性可能导致在同一时刻各个消费者获取的状态不一致这样会导致异常问题发生。
消费者协调器和组协调器
新版如何解决老版本问题
新版的消费者客户端对此进行了重新设计将全部消费组分成多个子集每个消费组的子集在服务端对应一个GroupCoordinator对其进行管理GroupCoordinator是Kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互也就是发送心跳加入group请求、提交位移等请求。
再均衡过程
ConsumerCoordinatorr与GroupCoordinator之间最重要的职责就是负责执行消费者再均衡的操作包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言一共有如下几种情形会触发再均衡的操作
有新的消费者加入消费组。有消费者容机下线。消费者并不一定需要真正下线例如遇到长时间的GC、网络延退导致消费者长时间未向GroupCoordinator发送心跳等情况时GroupCoordinator会认为消费者已经下线。有消费者主动退出消费组发送LeaveGroupRequest请求。比如客户端调用了 unsubscribleO方法取消对某些主题的订阅。消费组所对应的GroupCoorinator节点发生了变更。消费组内所订阅的任一主题或者主题的分区数量发生变化。
当有消费者加入消费组时消费者、消费组及组协调器之间会经历一下几个阶段。
第一阶段CFIND COORDINATOR
消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker并创建与该broker相互通信的网络连接。如果消费者已经保存了与消费组对应的GroupCoordinator节点的信息并且与它之间的网络连接是正常的那么就可以进入第二阶段。否则就需要向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的GroupCoordinator这里的“某个节点”并非是集群中的任意节点而是负载最小的节点。
Kafka 在收到 FindCoordinatorRequest请求之后会根据coordinator_key也就是groupId查找对应的GroupCoordinator节点如果找到对应的GroupCoordinator则会返回其相对应的nodeid、host和port信息。具体查找GroupCoordinator的方式是先根据消费组groupId的哈希值计算_consumer_offsets中的分区编号。找到对应的_consumer_offsets中的分区之后再寻找此分区leader副本所在的broker节点该broker节点即为这个groupId所对应的GroupCoordinator节点。消费者groupId最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给此分区leader副本所在的broker节点让此broker节点既扮演GroupCoordinator的角色又扮演保存分区分配方案和组内消费者位移的角色这样可以省去很多不必要的中间轮转所带来的开销。
第二阶段JOINGROUP
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求并处理响应。
JoinGroupRequest中的group_protocols域为数组类型其中可以囊括多个分区分配策略这个主要取决于消费者客户端参数partition.assignment.strategy的配置。如果配置了多种策略那么JoinGroupRequest中就会包含多个protocol name和protocol metadata。
如果是原有的消费者重新加入消费组那么在真正发送JoinGroupRequest请求之前还要执行一些准备工作 1如果消费端参数enable.auto.commit设置为tue默认值也为tue即开启自动提交位移功能那么在请求加入消费组之前需要向GroupCoordinator提交消费位移。这个过程是阻塞执行的要么成功提交消费位移要么超时。 2如果消费者添加了自定义的再均衡监听器ConsumerRebalanceListener那么此时会调用onPartitionsRevokedO方法在重新加入消费组之前实施自定义的规则逻辑比如清除一些状态或者提交消费位移等。 3因为是重新加入消费组之前与GroupCoordinator节点之间的心跳检测也就不需要了所以在成功地重新加入消费组之前需要禁止心跳检测的运作。
消费者在发送JoinGroupRequest请求之后会阻塞等待Kafka服务端的响应。服务端在收到JoinGroupRequest请求后会交由GroupCoordinator来进行处理。GroupCoordinator首先会对JoinGroupRequest请求做合法性校验比如group_id是否为空、当前broker节点是否是请求的消费者组所对应的组协调器、rebalance_timeout的值是否在合理的范围之内。如果消费者是第一次请求加入消费组那么JoinGroupRequest请求中的memberid值为null即没有它自身的唯一标志此时组协调器负责为此消费者生成一个memberid。这个生成的算法很 简单具体如以下伪代码所示。
String memberId clientId -UUID.randomuuID.toString
其中clientld为消费者客户端的clientd对应请求头中的clientid。由此可见消费者的memberid由clientId和UUID用“”字符拼接而成。
选举消费组的lcader
GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader这个选举的算法也很简单分两种情况分析。如果消费组内还没有leader那么第一个加入消费组的消费者即为消费组的leader。如果某一时刻leader消费者由于某些原因退出了消费组那么会重新选举一个新的leader这个重新选举leader的过程又更“随意”了。在GroupCoordinator中消费者的信息是以HashMap的形式存储的其中key为消费者的memberidvalue是消费者相关的元数据信息。leaderId表示leader 消费者的memberid它的取值为HashMap中的第一个键值对的key这种选举的方式基本上和随机无异。总体上来说消费组的leader选举过程是很随意的。
选举分区分配策略
每个消费者都可以设置自己的分区分配策略对消费组而言需要从各个消费者呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这个分区分配的选举并非由leader消费者决定而是根据消费组内的各个消费者投票来决定的。这里所说的“根据组内的各个消费者投票来决定”不是指GroupCoordinator还要再与各个消费者进行进一步交互而是根据各个消费者呈报的分配策略来实施。最终选举的分配策略基本上可以看作被各个 消费者支持的最多的策略具体的选举过程如下 1收集各个消费者支持的所有分配策略组成候选集candidates。 24每个消费者从候选集candidates中找出第一个自身支持的策略为这个策略投上一票。 3计算候选集中各个策略的选票数选票数最多的策略即为当前消费组的分配策略。
如果有消费者并不支持选出的分配策略那么就会报出异常IllegalArgumentExceptionMemberdoesnotsupportprotocol。所以请不要为同一个消费组的不同消费者设置不同的分配策略以防出现问题。需要注意的是这里所说的“消费者所支持的分配策略”是指partition.assignment.strategy参数配置的策略如果这个参数值只配置了RangeAssignor那么这个消费者客户端只支持RangeAssignor分配策略而不是消费者客户端代码中实现的3种分配策略及可能的自定义分配策略。
在此之后Kafka服务端就要发送JoinGroupResponse响应给各个消费者leader消费者和其他普通消费者收到的响应内容并不相同。leader消费者会收到最终的分配策略以及消费者成员信息而普通消费者只能收到最终的分配策略。由此可见Kafka把分区分配的具体分配交还给客户端自身并不参与具体的分配细节这样即使以后分区分配的策略发生了变更也只需要重启消费端的应用即可而不需要重启服务端。该过程可见如下图 第三阶段SYNC GROUP
leader消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配在此之后需要将分配的方案同步给各个消费者此时leader消费者并不是直接和其余的普通消费者同步分配方案而是通过GroupCoordinator这个“中间人”来负责转发同步分配方案的。在第三阶段也就是同步阶段各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案如下图所示
服务端在收到消费者发送的SyncGroupRequest请求之后会交由GroupCoordinator来负责具体的逻辑处理。GroupCoordinator同样会先对SyncGroupRequest请求做合法性校验在此之后会将从leader消费者发送过来的分配方案提取出来连同整个消费组的元数据信息一起存入Kafka的consumer_offsets主题中最后发送响应给各个消费者以提供给各个消费者各自所属的分配方案。
消费者在获得消费分区后会连接broker进行消费并定期发送心跳给消费者协调器表明自己活着。
消费组元数据信息
我们知道消费者客户端提交的消费位移会保存在Kafka的consumer_offsets主题中这里也一样只不过保存的是消费组的元数据信息GroupMetadata。具体来说每个消费组的元数据信息都是一条消息不过这类消息并不依赖于具体版本的消息格式因为它只定义了消息中的key和value字段的具体内容所以消费组元数据信息的保存可以做到与具体的消息格式无关。
第四阶段HEARTBEAT
进入这个阶段之后消费组中的所有消费者就会处于正常工作状态。在正式消费之前消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了GroupCoordinator并且GroupCoordinator将其保存到了Kafka内部的consumeroffsets主题中此时消费者可以通过OffsetFetchRequest请求获取上次提交的消费位移并从此处继续消费。
消费者通过向GroupCoordinator发送心跳来维持它们与消费组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳就被认为是活跃的说明它还在读取分区中的消息。心跳线程是一个独立的线程可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长则整个会话就被判定为过期GroupCoordinator也会认为这个消费者已经死亡就会触发一次再均衡行为。消费者的心跳间隔时间由参数heartbeat.interval.ms指定默认值为3000即3秒这个参数必须比session.timeout.ms参数设定的值要小一般情况下heartbeat.interval.ms的配置值不能超过session.timeout.ms配置值的1/3。这个参数可以调整得更低以控制正常重新平衡的预期时间。
如果一个消费者发生崩溃并停止读取消息那么GroupCoordinator会等待一小段时间确认这个消费者死亡之后才会触发再均衡。在这一小段时间内死掉的消费者并不会读取分区里的消息。这个一小段时间由session.timeout.ms参数控制该参数的配置值必须在broker端参数group.min.session.timeout.ms默认值为6000即6秒和group.max.session.timeout.ms默认值为300000即5分钟允许的范围内。
还有一个参数max.po11.interval.ms它用来指定使用消费者组管理时poll0方法调用之间的最大延退也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时时间期满之前polio没有调用则消费者被视为失败并且分组将重新平衡以便将分区重新分配给别的成员。
除了被动退出消费组还可以使用LeaveGroupRequest请求主动退出消费组。
消费者协调器和控制器关系
在Kafka中控制器Controller与组协调器Group Coordinator在工作上存在一定的交互具体体现在以下几个方面
分区分配与消费者组管理 控制器负责分区管理控制器负责管理Kafka集群中的分区状态如分区的创建、删除以及副本的分配等。当一个新的分区被创建时控制器会决定该分区的副本分布在哪些Broker上。 组协调器依赖分区信息组协调器在进行消费者组的分区分配时需要依赖控制器所管理的分区元数据信息。例如组协调器要根据分区的数量、副本分布以及消费者组内消费者的数量和位置等信息来为消费者分配合适的分区以实现负载均衡和高效的数据消费。
选举与状态同步 控制器主导Broker选举在Kafka集群中当Broker出现故障或新的Broker加入时控制器会负责选举新的Leader Broker以及进行相关的状态变更。 组协调器获取选举结果组协调器需要与控制器进行交互以获取最新的Broker选举结果和集群状态信息。这有助于组协调器了解哪些Broker是活跃的哪些是不可用的从而更好地管理消费者组的状态确保消费者能够正确地连接到合适的Broker进行数据消费。
心跳检测与故障处理 组协调器检测消费者心跳组协调器通过心跳机制来检测消费者的存活状态。如果消费者长时间没有发送心跳组协调器会认为消费者可能出现了故障并进行相应的处理如重新分配分区。 控制器协助故障判断在这个过程中组协调器可能会与控制器进行交互以获取更全面的集群状态信息来确定消费者的故障是否是由于Broker故障等原因引起的。控制器可以提供关于Broker状态、分区状态等方面的信息帮助组协调器更准确地判断故障情况并采取合适的措施如触发重新平衡操作。