Elasticsearch 朴朴 双活自研实践与思考 实时双向同步 (elasticsearch)

Elasticsearch 朴朴 双活自研实践与思考 实时双向同步 (elasticsearch)

背景

随着朴朴业务的快速增长,业务中断给公司的品牌、经济以及客户带来影响越来越大,因此业务对容灾的需求越来越迫切,要求在发生灾难时,业务能够快速恢复。显然当前公司采用的数据冷备的方式无法满足这个需求。出于公司长远规划考虑,核心业务需要具备地域级灾难的故障逃逸能力,确保在地域灾难发生时能够在分钟级内快速恢复业务,因此朴朴的双活建设被提上了议程。

双活建设包含应用双活建设和数据双活建设。数据双活是指两个数据中心都有完整数据,并且同时承担读写业务。这两个数据中心互为备份且实时同步数据,确保一个数据中心故障后,另一个数据中心可以快速接管业务。目前朴朴需要实现双活的业务数据散布在 MySql、Elasticsearch、Redis、Kafka 等等,要实现双活就要求两个数据中心中的各类数据都能够实时双向同步。不同数据双向同步的实现原理相近,本文主要以 Elasticsearch(之后简称 ES) 双向同步为例进行介绍。

方案调研

当前业内实现 ES 数据同步的方案主要有两种:

对于 ES 官方提供的 CCR 能够实现单个索引的单向同步,但不能实现单个索引的双向同步,因为其无法解决数据回环以及数据冲突的问题,并且 CCR 是铂金会员才能享有的,是需要收费的。

双写是目前比较通用的数据同步方式,双写可以是业务同步双写,也可以是通过 MQ 实现异步双写。业务同步双写是指业务写 ES 时需要同时往两个 ES 集群写入数据,两者都成功了才算写入成功。这种实现方式对业务的侵入性比较高,而且无法保障数据一致性。通过 MQ 实现异步双写的方式是业务不将数据写入 ES,而是将数据直接写入 MQ,然后由 MQ 消费者实现 ES 数据的写入。这种方案能够保障数据一致性,但是引入了 MQ 增加了系统复杂度,并且数据的延时变高。

通过以上分析可以看出常规方案都有明显的缺陷,不能满足预期的能够实现双向同步、业务侵入小且实时性又高的要求。那 ES 是否存在像 MySql 的 binlog 那般通过订阅就能够实时拉取到变更日志,然后在另一边进行回放从而达到数据同步效果的机制呢?

这当然也是有的,ES 有 translog 能够记录下变更信息,理论上通过监听消费 translog 的变更就能够实现数据同步。这个方案强依赖于 translog,translog 是会被删除的,如果某些数据还未被同步而 translog 文件已被删除,这就会造成数据丢失,从而导致两个集群数据不一致。在 ES 中,当 flush 操作执行完成之后, translog 就会被直接删除,而这个 flush 操作触发的影响因素较多,是不可控的,这意味着 translog 随时都可能被清除,如果用这种方案两个集群的数据一致性就没法保证,因此 translog 的方案也不太合适。

除了 translog,ES 从 6.7 版本开始,还提供了软删除机制(应该说是 Lucene 提供,为便于理解之后统一用 ES 描述)。软删除使得更新和删除操作在 merge 时不会被清除,它支持将 ES 的操作记录日志(operation)按照顺序排放,并以递增的 SeqNo 标记每个操作记录日志所处的位置。另一个集群只要监听并实时拉取该操作记录后再回放就能够实现数据实时同步了,同时通过记录 SeqNo 的点位,确保故障后能够断点续传,避免数据丢失。这些更新和删除操作会占用额外的存储空间,如果都不删除会导致资源浪费。这可以通过软删除 + 租约的方式,实现主动控制历史操作记录存放的时间,保障数据同步的同时也能及时清理已同步的历史记录,避免资源浪费。该方式虽然需要进行较大的源码改造,但是整体效果是能够满足预期的实时性高、业务侵入小并且能够实现双向同步的要求。

方案设计

实现思路

由于在数据双活建设过程中有多种数据类型需要实现双向同步,因此在设计之初就计划实现一个通用的数据传输服务,该服务通过实时拉取源的操作变更记录,然后到目标上进行回放从而实现数据同步,以 ES 数据同步为例,其流程如下图所示。

ES 跨集群复制简图

数据传输服务利用 ES 的软删除 + 租约的机制,定期从源集群中拉取操作变更记录。但 ES 的软删除机制是从 6.7 版本开始提供的,在 ES 6.x 版本软删除是默认关闭的,另外获取操作变更记录的接口在开源版本中也是没有的,这些都需要通过改动 ES 源码进行开放。

软删除开启后,每个索引的每个分片的操作变更记录(包含更新和删除)都会被按顺序保存下来,并且通过一个递增的 SeqNo 来标记各个变更记录对应的点位。如下图所示,数据传输服务中任务 1 负责拉取 index1 的 分片 1 数据然后再另一个集群进行回放。当前已经拉取到 SeqNo 为 13 的位置,也就是说数据传输服务上已经有 SeqNo 小于 14 的操作记录,只要将这些数据写到集群 2 进行回放就实现数据同步了。同理再起一个任务负责拉取集群 2 中该分片的操作变更记录,然后同步到集群 1 进行回放,这样就实现了分片的双向同步。

跨集群复制细节

整体框架

数据同步平台系统架构

上图是数据同步平台的系统框架图,主要分为管控层、通用层、数据仓库层以及存储层。其中管控层和通用层主要是负责配置、监控以及任务调度等。数据仓库层主要负责变更数据的抓取、转换、过滤和写入,也就是 ETL 流程,它整个数据同步的核心流程。数据同步平台的具体实现见下图,从图中可以看出数据同步平台在实现时主要分为两个部分:管控服务以及数据传输服务。管控服务实现管控层和通用层的功能,主要服务对象是管理员,为其提供操作以及查看相关的功能。而数据传输服务实现数据仓库层的功能,负责具体的数据同步操作。

前文也提到过不同数据源的同步原理基本都是一样的,都是拉取变更日志然后再另一端回放,只不过是具体的通信协议以及报文格式等有差异而已。所以数据传输服务采用的是微内核 + 插件化的架构,核心流程是共用的,要实现不同数据源的同步只需要实现对应插件即可。

应用架构图

核心设计点

数据完整性

数据完整性主要指在故障恢复后不会丢数据,也不需要重新进行全量同步。这要求在设计过程中要考虑故障后的断点续传功能。ES 的数据同步主要是拉取历史操作记录然后进行回放来实现,历史操作记录是包含 SeqNo 的,这是一个递增的游标。为了保证数据完整性,数据传输服务在回放成功之后会及时将 SeqNo 记录到 MQ 中。当出现故障导致数据传输服务器意外退出时,数据传输服务器在启动后会自动从 MQ 中读取上次记录的最新点位,然后继续往后消费,从而保证故障后数据不丢失。

数据冲突

在进行双向同步时,由于两边数据源都可能对同一份数据进行改动,如果这个改动在同一时刻发生,就出现数据冲突。这里同一时刻的定义并非要求真正意义上时间点一致,而是指在 A 集群修改数据后在同步到 B 集群之前的这段时间内,B 集群如果也有对这条数据做改动,那么就算同一时刻。

数据冲突有可能会导致两边数据不一致,因为在数据冲突的时候,无法确定到底应该以哪条记录为准。目前处理数据数据冲突的方案有以下几种,可以根据场景选择某一种或者组合:

方案 1 中采用的是 Last write wins 的方式,通过这种方式,同一条记录的两个 update 语句不论执行顺序如何变化、不论执行多少次,他们执行后的结果是固定的,这个是满足 CRDT 语义的,能够保障数据最终一致性。但是方案 1 强依赖于更新时间,要求不同机器实例时间一致,并且每个同步的索引都需要具备更新时间字段,要求业务在更新数据时也同步更新该时间字段,这会加大业务的改造工作量。

方案 2 也能保障数据最终一致性,它是依赖于版本号,需要业务维护版本号,并且保证在分布式场景下这个版本号是递增的。这样增加了业务改造开发工作量以及增加业务复杂度,代价非常大。而方案 3 到 方案 5 虽然实现上比较简单,但是都有可能造成数据较大面积的错误或者是业务阻塞,数据的一致性无法保证。

通过以上方案对比,方案 3 、4、5 存在的问题对业务影响大,不予考虑。方案 1 和 方案 2 都能保证数据最终一致性,但相比于方案 2 ,方案 1 的实现难度更低,所以我们选用的方案 1 来解决数据冲突。针对方案 1 强依赖于更新时间的问题,通过 NTP 可以确保机器实例之间的时间误差在一定精度范围内。另外为了避免业务接入过程都需要改造,更新时间字段通过修改 ES 源码将时间字段内置并且主动更新,且该字段对业务无感,这样既能满足方案要求,也不影响业务。

数据回环

数据回环是指业务将数据写入到 DB1 之后会产生操作记录日志,数据传输服务通过拉取该操作记录日志将数据同步到 DB2。而数据被写入到 DB2 之后也会产生操作记录日志,该日志同样会被数据通过服务拉取并同步到 DB1,如此就形成回环,如下图所示。

数据回环示意图

避免数据回环的方案就是要找到能够打断回环的点。如下图所示(图中 DTS 就是数据传输服务),我们采用的方案是数据传输服务在同步时为每条操作记录日志打上来源标识,数据传输服务在同步数据时通过判断该标识是否与目标数据源一致,如果一致就表示该记录是回环数据,应该直接丢弃,这样就避免回环问题。

避免回环示意图

由数据回环示意图可以看出这个避免回环的来源标识需要落到 DB2 进行存储,这样数据传输服务才能从操作日志记录中拉取到该标识的信息。对于 ES 而已,这个打标可以考虑在索引的 _source 上增加回环标识字段,这样标识就跟着数据走了。但是删除操作时是没有 _source 字段的,因此删除操作需要独立处理,比如利用 ES 现有的元字段信息进行携带或者直接增加元字段。

一致性校验

尽管已经有对数据冲突进行处理,最终还是可能出现两个数据源的数据不一致的问题。因此还需要有额外的手段进行数据校验,当发现有数据不一致时,给出告警,然后人为介入处理。常用的数据校验方式可以有以下几种,可根据实际情况选择合适的方式进行实现:

上线问题与处理

ES 源集群 CPU 大幅上升

ES 数据同步功能上线后,某个业务集群开启数据同步时,发现源集群的 CPU 有大幅提升。下面两张图分别是未开启数据同步的 CPU 使用率,以及开启数据同步的 CPU 使用率, 从图中可以看出 CPU 从 25% 直接被提升到 50%。

未配置数据同步时源 cpu 使用率 配置了数据同步时源 cpu 使用率

CPU 升高的主要原因是在每次拉取增量数据时,ES 都要将增量数据打成快照并同步给数据传输服务,这个过程是比较耗 cpu 的,高频度数据拉取会造成源集群 cpu 大幅上升。解决方案是通过限流降低拉取频率,同时增加每次拉取的数据量,虽然延迟上有毫秒级的升高,但 cpu 能够大幅降低,CPU 上升幅度控制在 10% 以内。

大批量更新的场景下会出现更新阻塞

线上某个业务集群在升级了 ES 版本为自研版本时,部分更新操作出现超时,从 prometheus 指标上看,索引写入时长出现突刺,最高写入延迟达到了 20s,指标如下图所示。

写入耗时指标

通过查看指标发现该索引的 refresh 时间也出现大幅上升,如下图所示,并且 write 队列堆积比较严重。另外将版本降回开源版本后并继续以此压力写入,不会出现写入延迟问题,所以这个是自研版本中引入的变更引起的。

refresh 耗时指标

当前我们使用的 ES 版本是 6.7,在开源版本中默认是不开启软删除的,而在自研版本中软删除被默认开启。我们怀疑是软删除引起写入延迟,便将自研版本的软删除关闭,发现问题确实就不出现了,表现与开源版本无异。之后在社区中也看到软删除开启后,在高频的 update 场景下会导致 refresh 耗时变长的问题。refresh 慢会导致 indexing buffer 的内存来不及 refresh 到磁盘中,当 indexWriter 大于 index_buffer_size 配置的阈值,Elasticsearch 会降低分片的写入速度。

这个问题应该算是 lucene 的 bug,lucene 在 8.5 版本中对其做了优化,并且其后的版本中也在持续优化。在不升级 ES 版本的情况下,通过合并 lucene 的优化改造能够缓解这个问题。

总结与展望

目前 ES 的双向同步建设已经取得了阶段性的进展,并且在部分业务的双活场景上成功落地。但是当前双向同步还只实现了增量同步,业务接入过程还需要 DBA 配合处理存量数据同步,接入双活的流程不够顺滑。因此接下来首先要实现存量数据同步,此外还会在接入流程以及配套辅助工具的建设上投入较大精力,从而降低业务接入和运维成本,助力朴朴双活建设顺利推进。

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。