阿里巴巴国际站的前台网址是h5建设网站公司
文章目录
- 背景
 - 环境
 - 工具选型
 - 实操
 - MM1
 - MM2
 - 以MM2集群运行
 - 以Standalone模式运行
 
- 验证
 - 附录
 - MM2配置表
 - 其他
 
背景
一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。
环境
source集群:kafka-2.6.0、2个broker、虚拟机
target集群:kafka-2.6.0、3个broker、k8s
工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)
需求:Topic名称不能改变、数据完整
条件:target集群需要开启自动创建Topic:auto.create.topics.enable=true
工具选型
本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。
并且在MM1多年的使用过程中发现了以下局限性:
- 静态的黑名单和白名单
 - Topic信息不能同步,所有Topic同步到目标端都只有一个Partition
 - 必须通过手动配置来解决active-active场景下的循环同步问题(MM2为解决这个问题,也做了体验很不好的改动)
 - rebalance导致的性能问题
 - 缺乏监控手段
 - 无法保证Exactly Once
 - 无法提供容灾恢复
 - 无法同步Topic列表,只能同步有数据的Topic
 
MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,可以支持同步以下数据:
- 完整的Topic列表
 - Topic配置
 - ACL信息(如果有)
 - consumer group和offset(kafka2.7.0之后版本才行)
 - 其他功能: 
- 支持循环同步检测
 - 多集群自定义同步(同一个任务中,可以多集群同步:A->B、B->C、B->D)
 - 提供可监控Metrics
 - 可通过配置保证Exactly Once
 - …
 
 
实操
秉着实操前先演练的原则,我自己搭建了一个和目标集群相同配置的集群,用于验证不同工具的操作结果。有足够把握之后,再对目标集群实际操作。
MM1
执行 --help 查看参数选项:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option                                   Description
------                                   -----------
--abort.on.send.failure <String: Stop    Configure the mirror maker to exit onthe entire mirror maker when a send      a failed send. (default: true)failure occurs>
--consumer.config <String: config file>  Embedded consumer config for consumingfrom the source cluster.
--consumer.rebalance.listener <String:   The consumer rebalance listener to useA custom rebalance listener of type      for mirror maker consumer.ConsumerRebalanceListener>
--help                                   Print usage information.
--message.handler <String: A custom      Message handler which will processmessage handler of type                  every record in-between consumer andMirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom messageArguments passed to message handler      handler for mirror maker.constructor.>
--new.consumer                           DEPRECATED Use new consumer in mirrormaker (this is the default so thisoption will be removed in a futureversion).
--num.streams <Integer: Number of        Number of consumption streams.threads>                                 (default: 1)
--offset.commit.interval.ms <Integer:    Offset commit interval in ms.offset commit interval in                (default: 60000)millisecond>
--producer.config <String: config file>  Embedded producer config.
--rebalance.listener.args <String:       Arguments used by custom rebalanceArguments passed to custom rebalance     listener for mirror maker consumer.listener constructor as a string.>
--version                                Display Kafka version.
--whitelist <String: Java regex          Whitelist of topics to mirror.(String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#         
 
核心参数就两个:消费者和生产者的配置文件:
consumer.properties:(消费source集群)
bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
 
producer.properties:(发送消息至目标集群)
bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3
 
执行脚本:
./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"
 
MM1比较简单,只要两个配置文件没问题,sasl配置正确,基本就OK了,适合简单的数据同步,比如指定topic进行同步。
MM2
有四种运行MM2的方法:
- As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
 - As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
 - As a standalone Connect worker.(作为独立的Connect工作者)
 - In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)
 
本文介绍第一种和第三种:作为专用的MirrorMaker群集、作为独立的Connect工作者,第二种需要搭建connect集群,操作比较复杂。
以MM2集群运行
这种模式是最简单的,只需要提供一个配置文件即可,配置文件定制化程度比较高,根据业务需求配置即可
老样子,执行 --help 看看使用说明:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.propertiesMirrorMaker 2.0 driverpositional arguments:mm2.properties         MM2 configuration file.optional arguments:-h, --help             show this help message and exit--clusters CLUSTER [CLUSTER ...]Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#  
 
可以看到,参数简单了许多,核心参数就一个配置文件。
mm2.properties:
name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定义集群别名
clusters = event-center, event-center-new# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3
 
需要注意的是:replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。
官方也给出了解释:
这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作。
针对如何自定义策略及使用方法,见我的另一篇文章:
为了保证脚本后台运行,写一个脚本包装一下:
run-mm2.sh:
#!/bin/bashexec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &
 
之后执行脚本即可。
以Standalone模式运行
这种模式会麻烦点,需要提供一个kafka,作为worker节点来同步数据,使用的脚本为:connect-standalone.sh
–help看看如何使用:
./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]# 
 
需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)
worker.properties:
bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
 
connector.properties:
name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3
 
执行:
./connect-standalone.sh worker.properties connector.properties
 
这种方式做一个简单的介绍,我最后采用的是上一种方式,比较简单直接
验证
验证:
-  
消息数量 OK
使用kafka-tool工具连接上两个集群进行比对
 -  
Topic数量 OK
- source:
 
./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt- sink
 
./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt- command.properties示例:
 
security.protocol = SASL_PLAINTEXT sasl.mechanism = PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd"; -  
新消息是否同步 OK
 -  
新Topic是否同步 OK
 -  
Consumer是否同步 NO
 
./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt 
 
 如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils
-  
consumer offset是否同步 NO
 -  
ACL是否同步 OK
通过kafka-acls.sh或者客户端工具kafka-tool可以查看 
附录
MM2配置表
| property | default value | description | 
|---|---|---|
| name | required | name of the connector, e.g. “us-west->us-east” | 
| topics | empty string | regex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported. | 
| topics.blacklist | “..internal, ..replica, __consumer_offsets” or similar | topics to exclude from replication | 
| groups | empty string | regex of groups to replicate, e.g. “.*” | 
| groups.blacklist | empty string | groups to exclude from replication | 
| source.cluster.alias | required | name of the cluster being replicated | 
| target.cluster.alias | required | name of the downstream Kafka cluster | 
| source.cluster.bootstrap.servers | required | upstream cluster to replicate | 
| target.cluster.bootstrap.servers | required | downstream cluster | 
| sync.topic.configs.enabled | true | whether or not to monitor source cluster for configuration changes | 
| sync.topic.acls.enabled | true | whether to monitor source cluster ACLs for changes | 
| emit.heartbeats.enabled | true | connector should periodically emit heartbeats | 
| emit.heartbeats.interval.seconds | 5 (seconds) | frequency of heartbeats | 
| emit.checkpoints.enabled | true | connector should periodically emit consumer offset information | 
| emit.checkpoints.interval.seconds | 5 (seconds) | frequency of checkpoints | 
| refresh.topics.enabled | true | connector should periodically check for new topics | 
| refresh.topics.interval.seconds | 5 (seconds) | frequency to check source cluster for new topics | 
| refresh.groups.enabled | true | connector should periodically check for new consumer groups | 
| refresh.groups.interval.seconds | 5 (seconds) | frequency to check source cluster for new consumer groups | 
| readahead.queue.capacity | 500 (records) | number of records to let consumer get ahead of producer | 
| replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | use LegacyReplicationPolicy to mimic legacy MirrorMaker | 
| heartbeats.topic.retention.ms | 1 day | used when creating heartbeat topics for the first time | 
| checkpoints.topic.retention.ms | 1 day | used when creating checkpoint topics for the first time | 
| offset.syncs.topic.retention.ms | max long | used when creating offset sync topic for the first time | 
| replication.factor | 2 | used when creating remote topics | 
其他
参考:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0
https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new
https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf
https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide
