kafka集群扩容后的数据迁移

Kafka SenLin 4年前 (2020-04-11) 442次浏览 已收录 0个评论

最近我们生产环境的kafka集群有增加节点的需求,然而kafka在新增节点后并不会像elasticsearch那样感知到新节点加入后自动将数据reblance到新集群中,因此这个过程需要我们手动分配。一番折腾之后,实现了增加kafka集群节点并将原有数据均匀分配到扩容后的集群。下面结合一个例子谈一下整个过程。

一、环境说明

1.集群状况

假定当前的cluster中只有(101,102,103)三个kafka节点,有一个名为think_tank的topic,该topic有2个replica,均匀分布在三个节点上.

2.目的

我们要做的是在cluster中新增两个节点(记为104,105)后,将的数据均匀分到新集群中的5个节点上。

二、操作步骤

新增kafka节点的部署不是本文重点,就不在此赘述。

其实官方文档的这一小节关于集群扩容讲解很详细:Expanding your cluster ,整个过程需要分为三个步骤:获取kafka给出的建议分配方案、按照给出的分配方案执行分配、查看分配的进度以及状态。这三个步骤对应了kafka脚本提供的三个partition reassigment工具。

–generate: In this mode, given a list of topics and a list of brokers, the tool generates a candidate reassignment to move all partitions of the specified topics to the new brokers. This option merely provides a convenient way to generate a partition reassignment plan given a list of topics and target brokers.

–execute: In this mode, the tool kicks off the reassignment of partitions based on the user provided reassignment plan. (using the –reassignment-json-file option). This can either be a custom reassignment plan hand crafted by the admin or provided by using the –generate option

–verify: In this mode, the tool verifies the status of the reassignment for all partitions listed during the last –execute. The status can be either of successfully completed, failed or in progress

结合例子具体说明:

1、生成重新分配topic的方案

脚本的参数是以json文件的形式传入的,首先要新建一个json文件并设置需要分配哪些topic,think_tank-to-move.json:

{
    "topics":[
        {
            "topic":"think_tank"
        },
        {
            "topic":"这里可以同时指定多个..."
        }
    ],
    "version":1
}

使用/bin目录中提供的kafka-reassign-partitions.sh的脚本请求获取生成分配方案:

./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --topics-to-move-json-file think_tank-to-move.json --broker-list "101,102,103,104,105" --generate

–broker-lsit 的参数 “101,102,103,104,105”是指集群中每个broker的id,由于我们是需要将所有topic均匀分配到扩完结点的5台机器上,所以要指定。同理,当业务改变为将原来的所有数据从旧节点(01,102,103)迁移到新节点(104,105)实现数据平滑迁移,这时的参数应”104,105″.

脚本执行后返回的结果如下:

Current partition replica assignment
{"version":1,"partitions":[{"topic":"think_tank","partition":2,"replicas":[101,102]},{"topic":"think_tank","partition":4,"replicas":[103,102]},{"topic":"think_tank","partition":3,"replicas":[102,101]},{"topic":"think_tank","partition":0,"replicas":[102,103]},{"topic":"think_tank","partition":1,"replicas":[103,101]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"think_tank","partition":2,"replicas":[103,101]},{"topic":"think_tank","partition":4,"replicas":[105,103]},{"topic":"think_tank","partition":3,"replicas":[104,102]},{"topic":"think_tank","partition":0,"replicas":[101,104]},{"topic":"think_tank","partition":1,"replicas":[102,105]}]}

可以看出当前正在运行的方案中,think_tank的replica都是分布在101,102,103这3个节点,新给出的建议方案中replica均匀分布在扩容后的5个节点中。

2.执行分配方案

将上一个步骤中生成的建议方案复制到新建的think_tank_reassignment.json中:

{"version":1,"partitions":[{"topic":"think_tank","partition":2,"replicas":[103,101]},{"topic":"think_tank","partition":4,"replicas":[105,103]},{"topic":"think_tank","partition":3,"replicas":[104,102]},{"topic":"think_tank","partition":0,"replicas":[101,104]},{"topic":"think_tank","partition":1,"replicas":[102,105]}]}

使用脚本执行:

 ./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file think_tank_reassignment.json --execute

脚本执行,返回内容:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"think_tank","partition":2,"replicas":[101,102]},{"topic":"think_tank","partition":4,"replicas":[103,102]},{"topic":"think_tank","partition":3,"replicas":[102,101]},{"topic":"think_tank","partition":0,"replicas":[102,103]},{"topic":"think_tank","partition":1,"replicas":[103,101]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

如上,成功开始执行分配数据,同时提示你如果有需要将之前的分配方案备份便于回滚到原方案。

3.查看配过程进

查看脚本的方法如下,注意这次的json文件要和执行步骤中的json是同一个文件:

 ./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file think_tank_reassignment.json --verify

返回结果:

Reassignment of partition [think_tank,2] completed successfully
Reassignment of partition [think_tank,1] completed successfully
Reassignment of partition [think_tank,3] is still in progress
Reassignment of partition [think_tank,4] completed successfully
Reassignment of partition [think_tank,0] is still in progress

is still in progress表示还在处理中,全部迁移成功后每个partition都会显示 completed successfully.注意如果topic数据量大,这个过程可能会时间长一些,不要轻易重启节点!可能会导致数据不一致!!!

三、其它

这个partion reassignment工具同样可以按需手动地将某个特定的topic指定到特定的broker上,所要做的就是按照步骤一给定的格式关联partition到borker即可,如,将think_tank的partition0指定到101、102两节点上:

{
    "version":1,
    "partitions":[
        {
            "topic":"think_tank",
            "partition":0,
            "replicas":[
                101,
                105
            ]
        }
    ]
}

另外,如果有增加replica的个数的需求,同样可以使用这个脚本,可以翻一下官网文档。

One more thing

一点儿感触,在确定问题所在后,官方的文档应该作为我们优先考虑的一个重要资料源,网上的资料由于时间较早、版本不同的原因,解决方式可能需要细微的改动才能达到目的,这些坑在官方的一手资料上其实是可以规避的。

欢迎拍砖,欢迎交流~

注:转载请注明出处


top8488大数据 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:kafka集群扩容后的数据迁移
喜欢 (0)
[]
分享 (0)

您必须 登录 才能发表评论!