背景
当Kafka 减少Broker节点后,需要把数据分区迁移到其他节点上,以下将介绍我的一次迁移验证过程。
前3步为环境准备,实际数据操作看第4步即可
增加Broker节点,也可以采用步骤4相同的方法进行重新分区
方案思想:使用kafka-reassign-partitions命令,把partition重新分配到指定的Broker上
创建测试topic,具体为3个分区,2个副本
1 | kafka-topics --create --topic test-topic \ |
查看创建的topic
1 | kafka-topics --describe --zookeeper cdh-002/kafka --topic test-topic |
产生若干条测试数据
1 | kafka-console-producer --topic test-topic \ |
使用命令进行重分区
新建文件topic-to-move.json ,比如加入如下内容
1
{"topics": [{"topic":"test-topic"}], "version": 1}
使用–generate生成迁移计划,broker-list根据自己环境设置,我的环境由于broker 75挂掉了,只剩下76和77
1
2
3kafka-reassign-partitions --zookeeper cdh-002/kafka \
--topics-to-move-json-file /opt/lb/topic-to-move.json \
--broker-list "76,77" --generate输出日志:
(从日志可知各个分区副本所在的Broker节点,以及建议的副本分布)
Current partition replica assignment (当前分区副本分布)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20{
"version":1,
"partitions":[
{
"topic":"test-topic",
"partition":0,
"replicas":[76,77]
},
{
"topic":"test-topic",
"partition":2,
"replicas":[75,76]
},
{
"topic":"test-topic",
"partition":1,
"replicas":[77,75]
}
]
}Proposed partition reassignment configuration (建议分区副本分布)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20{
"version":1,
"partitions":[
{
"topic":"test-topic",
"partition":0,
"replicas":[76,77]
},
{
"topic":"test-topic",
"partition":2,
"replicas":[76,77]
},
{
"topic":"test-topic",
"partition":1,
"replicas":[77,76]
}
]
}新建文件kafka-reassign-execute.json,并把建议的分区副本分布配置拷贝到新建文件中。(生产上一般会保留当前分区副本分布,仅更改下线的分区,这样数据移动更少)
使用–execute执行迁移计划 (有数据移动,broker 75上的数据会移到broker 76和77上,如果数据量大,执行的时间会比较久,耐心等待即可)
1
2
3kafka-reassign-partitions --zookeeper cdh-002/kafka \
--reassignment-json-file /opt/lb/kafka-reassign-execute.json \
--execute使用-verify查看迁移进度
1
2
3kafka-reassign-partitions --zookeeper cdh-002/kafka \
--reassignment-json-file /opt/lb/kafka-reassign-execute.json \
--verify
通过消费者验证,可知,并未丢失数据。注意需要加–from-beginning。(此时broker 75和77同时宕机,也不会丢失数据,因为76上有了所有分区的副本)
1
kafka-console-consumer --topic test-topic --from-beginning --zookeeper cdh-002/kafka
<font color="red" size=4>另外一种验证方法是:(生产最佳实践)</font>
另外一种验证方法就是通过查看Kafka存储路径来确认,是否有迁移数据
1
2
3
4
5
[root@cdh-003 ~]# cd /var/local/kafka/data/
[root@cdh-003 data]# ll
rwxr-xr-x 2 kafka kafka 110 Oct 23 14:21 test-topic-0
drwxr-xr-x 2 kafka kafka 110 Oct 23 14:52 test-topic-1
drwxr-xr-x 2 kafka kafka 110 Oct 23 14:21 test-topic-2