本文主要介绍360商业化在跨IDC kafka热备方面的实践, 接下来会按以下顺序介绍各个议题:
- MM2简介
- 跨IDC kafka热备多活方案
- 产品化
- 需要注意的风险
在介绍MM2之前先谈一下MM1, 这个是kafka很早之前就有的组件,本质来说就是实现了consumer + producer, 从集群A将数据同步到集群B, 使用的是kafka client的high level api。如果网络不稳定,会有频繁rebalance问题。一个MM1实例是解决部分topic 从集群A—>B的同步问题。所以如果我们有N个kafka集群, 那么为了满足所有同步需求,理论上我们要维护N(N-1) 个实例, 如果算上每个实例只处理部分topic同步的话,那么需要维护的实例会更多。
MM2是MM1的升级替代品,用于kafka集群间的数据同步,解决了MM1的很多问题,同时部署维护成本大大降低, 以N个kafka集群为例,只需要部署N个MM2集群即可满足需求。基于更高抽象的Kafka Connect framework解决容错和水平扩展问题。内部基于low level(assign)来实现topic订阅, 没有频繁rebalance的问题, 对同步有更精准的控制。
MM2具有如下特性:
- 基于Kafka Connect framework和生态
- 自动探测新topic, partition
- 自动同步topic配置,自动同步topic acl
- 支持active-active集群对,以及任意数量的active集群
- 支持跨IDC同步, aggregation和其他复杂拓扑
- consumer offset等meta信息的同步和翻译
- no rebalance,减少同步波动
- 提供广泛的指标,例如跨多个数据中心/集群的端到端复制延迟
- 容错和水平可扩展
MM2架构如下:
MM2基于kafka connect framework 实现集群部署,通过Connect Worker来同步数据, 容错与水平扩展等由Connect框架保障。
MM2主要有两种运行模式:Connector cluster 和 Driver, 我们采用的是Driver模式, 因为这种模式封装得更好,配置简洁,也是官方推荐的模式。
MM2对于同步topic有前缀命名约定,默认情况下是source clusterName, 当然也可以自定义,如下图所有,所有同步topic都包含有source clusterName前缀,这样做有两个好处: 一是可以清楚知晓数据来源, 二是可以避免循环同步的问题。
下面介绍几个比较重要的内部topic,可以说MM2的主要特性都依赖于这几个topic:
- checkpoint topic: 同步集群间consumer group state,其中记录了consumer offset的映射关系, 在2.7.0+版本后还支持自动提交到target cluster的__consumer_offsets, 也可通过translateOffsets()接口手动翻译后提交
- offset_sync topic: 同步集群间broker端offset 映射关系, 分为upstreamOffset和downstreamOffset, 其中upstreamOffset是上游topic的offset(source端), downstreamOffset是下游topic的offset(target端)
- heartbeat topic: 用于监控replication flows, 可以动态发现replication topology,便于MM2动态分配同步任务
关于MM2的部署我们采用的方案是:每个kafka集群(作为target cluster)部署一个MM2集群实例,部署在k8s上方便运维管理, 通过jmx_prometheus_javaagent将metrics暴露成exporter供prometheus server抓取, 然后通过grafana展示。
上面简单介绍了下MM2, 主要是为我们后面介绍跨IDC 热备方案补充一点背景知识,MM2比较详细的介绍我们后面会单独写一篇文章来阐述。
在没有跨IDC kafka热备方案之前, 我们只有冷备方案,就是在集群A出现问题的情况下,手动将业务迁移到集群B上, 这种操作的代价是巨大的, 而且短时间内无法恢复,离线业务可能还好,实时业务是完全无法接受的。
- 业务上下游的依赖问题,因为涉及到生产和消费的先后问题,各业务必须协调好顺序
- 沟通上下游各个业务做迁移,沟通成本很高
- 数据安全得不到保障,在生产端没有及时切换的情况下可能会丢数据,下游pipeline是否能处理完整数据也要打个问号
- 数据恢复时间过长,对于实时业务是无法容忍的
所以我们想是不是可以做到跨IDC的热备多活, 在一个IDC集群出问题的情况下, 业务可以快速切换恢复。
在没有MM2之前, 要实现两个集群的实时同步,特别是consumer group state的实时同步是比较困难的, 一个是日志的实时同步稳定性,之前已经说过MM1的稳定性和可维护性是比较差的, 还有就是offset信息(分为broker offset, consumer offset)两个集群是完全独立的, 需要一个offset mapping实时映射。之前uber自己实现过一个多kafka集群热备的方案,基于uReplication和自研组件, 但是实现上有点复杂,需要维护的组件也较多。 在MM2发布之后,我们发现已经比较完美地解决了以上问题, 所以我们觉得基于MM2实现热备是一个很好的方案, 当然在实现的过程中也遇到了很多问题, 下面也会逐一讲解。
- 跨IDC容灾,在一个IDC不可用的情况下,业务可快速恢复
- 业务上下游pipeline可以解耦,各个业务切换完全独立,不需要协调上下游
- 切换代价尽量小,最好能不重启业务情况下切换集群
- 产品流程使用上尽量简单,将复杂度对业务屏蔽掉
这个是我们要实现的一个总体目标,主要是为了解决冷备情况下的痛点问题, 同时我们想通过产品化流程化将这个热备切换的过程对业务使用尽量简单。
为了实现Kafka集群跨IDC容灾,首先要考虑的是如何同步,做单向同步还是双向同步,我们的需求是两个集群的topic数据是一致,且要上下游模块完全解耦, 显然单向同步不能满足我们的需求,因为对于在集群A的producer 写入topicT, 在B集群也必须要有topicT, 否则producer就是无法迁移的, 所以我们只能使用双向同步的方案: 即在集群A,集群B都有topicT,然后双向同步。双向同步后无论producer写入的是哪个集群, 两个集群的topic数据是一致的。
下图以IDC1与IDC2的两个Kafka集群Cluster A与Cluster B之间双向同步topicT为例, consumer消费A集群,其offset信息会通过checkpoint topic被实时同步给B集群, 这样当A集群不可用时就可以切换到B集群的consumer继续消费。对于producer的切换更加简单,只要切换写入B集群topicT即可,两个集群数据是一致的。
以下是我们热备多活方案简单示意图, producer只要写成功一个集群即为成功, consumer因为group state实时同步,所以standby的consumer随时可以切换消费。
在实现以上方案的过程中,我们遇到了以下问题,下面来看下我们是如何解决的。
目前MM2不支持动态修改同步配置,但业务有对topic动态增减的需求,如果每次修改都需要重启MM2来实现的话,那么代价太大了。我们发现可以通过实现TopicFilter,GroupFilter接口来解决这个问题。我们使用mysql来存储MM2的同步meta信息, 那么在接口中动态查询获取mysql的信息即可满足动态加载同步信息的需求。重载configure方法,定期update数据库中同步配置信息,以下是代码片段:
MM2 不支持反向同步consumer group state
如上所示,对于B.topicT 到 topicT的offset mapping是不支持的,也就是mm2的checkpoint同步是单向的。比如topicT从cluster A 同步到 cluster B, consumer c 在cluster A消费topicT. cluster B会通过A.checkpoints.internel topic获取consumer c的state信息,其中有topicT —>A.topicT的映射关系, 但B.topicT –> topicT的反向映射关系是没有的, 但为了做consumer offset实时同步, 这个反向映射也是要支持的,否则就是不完整的。
那么如何获得offset反向映射呢, 我们要利用B集群的mm2-offset-syncs.A.internal 这个topic, 里面记录的是A集群同步到B集群的topic的broker offset mapping信息。 我们可以从B.topicT的offset信息加上两个集群的broker offset的差值delta来计算出B集群中topicT对应的offset信息, 计算方法如下:
现在我们假设B.topicT的offset为offset,upstreamOffset与downstreamOffset是 mm2-offset-syncs.A.internal 这个topic中记录的broker offset映射关系
delta = upstreamOffset - downstreamOffset
reverseOffset = offset + delta
默认情况下,在集群切换的过程中还是需要业务介入的, 需要修改target集群的配置信息,起码bootstrap.servers, topic列表是要变更的, 然后重启恢复业务。 但重启业务对线上服务总是会有影响的,有没有办法可以在不重启业务的情况下完成切换呢?
我们基于librdkafka/kafka clients封装了c++, java的客户端, 可以在客户端框架层面完成对切换动作的封装,这样上层业务就不需要做重启了。具体做法是基于“客户端框架+配置中心”的方式来实现。目前配置中心我们支持apollo, mysql的触发方式。业务在我们ultron平台发起切换流程,审批通过后触发切换集群,业务无需重启业务。
为了让业务容易理解使用热备方案,将双向同步,offset映射等概念封装屏蔽掉, 我们在产品化过程中引入了热备属性的概念。
热备的属性与定义如下:
- 热备topic: topic在两个集群间双向同步
- 热备producer: 只生产热备topic的producer, 且所有topic热备的target集群是需要一致的
- 热备consumer: 只消费热备topic的consumer, 且所有topic热备的target集群是需要一致的, 两个热备consumer group state实时同步
业务申请热备topic, producer,consumer之后,我们ultron平台会完成如下动作:
- 对热备topic配置双向同步
- 两个集群热备的consumer group state实时同步
- 授权每个项目用户对应的topic, group读写权限
对于热备topic, 我们支持业务级别的切换,即producer, consumer级别的切换,因为有时候业务可能是写入部分partition或消费某个group有问题,这个时候只需要某个模块的按需切换即可。 如果是整个kafka集群的不可用或IDC不可用这种情况, 那么各个业务独立完成切换即可。
ultron平台产品化集成展示
如果某个IDC的kafka集群不可用了,那么业务只需要在界面上申请切换集群,审批通过后,当前生效集群及配置会动态改变,由于我们的consumer offset信息是实时同步的, 业务修改参数后重启即可。如果业务基于我们的客户端框架则可实现触发式切换,业务无需重启。
这里主要考虑数据完整性风险。首先依赖于producer端的设置:
- producer端有retries=Long.MAX_VALUE,acks=ALL设置, 在send fail情况下有持久化机制, 在切换集群后可以恢复发送。
- 如果producer没有retry机制,也没有持久化机制,那么producer端就会丢数据,那么数据保障性就无从谈起
其次是MM2日志同步的完整性,以上述集群A(主集群), 集群B(备集群)为例:
- 如果集群A宕机且无法恢复, 那么MM2是会有少量日志丢失的。
- 如果集群A宕机且能恢复那么MM2能接着同步完,这时候MM2支持at least once语义。
参考资料