实时数据采集工具 基于 Oracle LogMiner 构建可用于生产实践的 和 Debezium (实时数据采集软件)

实时数据采集工具 基于 Oracle LogMiner 构建可用于生产实践的 和 Debezium (实时数据采集软件)

实时数据采集(又称为“变化数据捕获”Change>

为此,笔者通过深入研究 LogMiner 原理和 Debezium 源码,在其基础上进行改进,使其真正可用于生产实践。

说明:笔者的实验环境为 Oracle 11g 和 Debezium 1.5.4。但通过核对 Oracle 最新官方文档(21c)和 Debezium 最新源码(截至 2022 年 6 月 21 日最新版 1.9.4),本文所涉及的方案对 11g-21c 的 Oracle,以及 Debezium1.5.4 以上版本均适用(文中涉及到的 Debezium 源码均为 1.5.4 版本,非 1.5.4 版本的具体代码细节需根据文中原理自行定位)。限于篇幅本文对 Debezium 基础使用不做介绍,读者可参考官方文档了解其基本配置和用法。

一、LogMiner 抽数的基本过程

限于篇幅仅简要介绍其五个基本步骤,详细说明可参阅Oracle官方文档:

1)构造字典文件:EXEC DBMS_LOGMNR_D.BUILD(OPTIONS=> DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);

2)添加 Redo Logs:EXEC DBMS_LOGMNR.ADD_LOGFILE(LOGFILENAME => '/oracle/logs/log2.f', OPTIONS=> DBMS_LOGMNR.ADDFILE);

3)开启 LogMiner:EXEC DBMS_LOGMNR.START_LOGMNR(startScn => '..', endScn => '..');

4)查询 V$LOGMNR_CONTENTS 视图获取 Redo 信息并处理

5)查询当前的活跃 Online Redo Log 是否有变化,如果无变化则跳转到第 3)步,如果有变化则关闭 LogMiner(EXEC SYS.DBMS_LOGMNR.END_LOGMNR)并跳转回第 1)步。

二、ADG 备库抽数原理

为了介绍从 ADG 备库 CDC 方案,在此先对Oracle对Redo Logs的管理和ADG的原理简介如下:

Online Redo Logs 记录 Oracle 中的一切变化信息,由至少 2 个(组)预分配的文件构成。LGWR(LoG WriteR)进程循环写入这些文件,每次仅往其中一个(组)Online Redo Log 写入变化信息,每一条信息对应一个单调递增的 SCN(System Change Number)。当该 Online Redo Log 写满,或者被显式调用 ALTER SYSTEM SWITCH LOGFILE(强制切换 Online Redo Log),或者写入时间超过 ARCHIVE_LAG_TARGET 参数值,则 LGWR 切换到下一个(组)Online Redo Log 写入变化信息。同时,上一个(组)Online Redo Log 会由归档进程 ARCn 将其内容写入新的 Archived Redo Log。

图源:Oracle 官方文档

图源:Oracle 官方文档

ADG 为了保证同步效率,选择了仅仅同步恢复 Oracle 事务所需的信息(即 Redo 记录)。为了免受磁盘 I/O 状态影响,Redo 记录在尚未落地至主库 Online Redo Log 文件时,即直接从内存中实时传送至备库,备库收到这些 Redo 信息后便将其实际应用于物理库中。备库中 RFS(Remote File Server)进程负责将收到的 Redo 信息同步写入 Standby Redo Logs。当主库发生 Online Redo Log 切换时,RFS 进程同步进行 Standby Redo Log 切换,将新的 Redo 信息写入下一个(组)Standby Redo Log,并由归档进程 ARCn 将上一个(组)Standby Redo Log 内容写入 Archived Redo Log。

图源:《Oracle Active>

由上可知:

三、ADG 备库抽数实践

根据上述原理,我们可对 LogMiner 抽数步骤进行调整如下:

1. 备库新建单独 Oracle 实例

使用 DBCA 工具(推荐)或者 CREATE top="3026">2. 创建平面(Flat)字典文件

在>EXECUTE DBMS_LOGMNR_D.BUILD(DICTIONARY_LOCATION=>'/home/oracle/logmnr', DICTIONARY_FILENAME=>'dictionary.ora',OPTIONS=> DBMS_LOGMNR_D.STORE_IN_FLAT_FILE);

注意自 Oracle 12c 起往后的版本,dictionary_location 需替换为目录对象,具体参见官方文档说明。

3. 筛选 Standby Redo Logs 和 Archived Redo Logs。

在 target="_blank">V$STANDBY_LOG的字段说明理解下述逻辑):

SELECT MIN(F.MEMBER) AS FILE_NAME, L.LAST_CHANGE# AS LAST_CHANGE, F.GROUP#, L.FIRST_CHANGE# AS FIRST_CHANGE, 'CURRENT' as STATUS FROM V$STANDBY_LOG L, V$LOGFILE F WHERE F.GROUP# = L.GROUP# AND (L.ARCHIVED='NO' or L.STATUS='ACTIVE') GROUP BY F.GROUP#, L.LAST_CHANGE#, L.FIRST_CHANGE#, L.STATUS ORDER BY 3;

4. DBMS_LOGMNR 添加 Redo Logs

在 logmnr 实例上,使用 DBMS_LOGMNR.ADD_LOGFILE 添加第 3)步所筛选出的 Redo Logs。(注意:我们第 1 步中选择在 ADG 备库所在的物理机上创建新的 Oracle 实例,就是为了能使该实例能正常读取这些 Redo Logs)

5. DBMS_LOGMNR 开启 LogMiner

在 logmnr 实例上,使用 DBMS_LOGMNR.START_LOGMNR 开启 LogMiner,并查询 V$LOGMNR_CONTENTS 视图获取 Redo 信息并处理。

6. 判断跳转步骤

在 top="4067">四、ADG 备库抽数相关问题

1. 为什么需要新建单独的 Oracle 实例 logmnr 进行抽数?

笔者尝试过直接在>为此,笔者按照如下步骤开启 Oracle 10046 Trace(SQL_TRACE):

alter session set tracefile_identifier='10046';

alter session set timed_statistics = true;

alter session set statistics_level=all;

alter session set max_dump_file_size = unlimited;

alter session set events '10046 trace name context forever,level 12';

并重新将所有步骤在),10046 文件中会持续输出以下内容:

*** 2022-06-27 15:06:04.528

WAIT #139654617260408: nam='enq: WL - contention' ela= 30000095 name|mode=1464598531 log # / thread id #=2001421497 sequence #=972043434 obj#=-1 tim=1656313564528578

WAIT #139654617260408: nam='ARCH wait for archivelog lock' ela= 140 p1=0 p2=0 p3=0 obj#=-1 tim=1656313564528712

*** 2022-06-27 15:06:34.537

WAIT #139654617260408: nam='enq: WL - contention' ela= 30008500 name|mode=1464598531 log # / thread id #=2001421497 sequence #=972043434 obj#=-1 tim=1656313594537263

WAIT #139654617260408: nam='ARCH wait for archivelog lock' ela= 149 p1=0 p2=0 p3=0 obj#=-1 tim=1656313594537402

由于 ADG 备库中的相关进程对这些 Redo Logs 文件加锁处理,使得 LogMiner 进程无法同时对其进行操作。但由于这种加锁仅限于这些进程所在的 Oracle 实例(即 top="5051">2. 字典文件是做什么用的?为什么需要采用 Flat File?

字典文件用于将表名、字段名等元信息的物理表示转换为逻辑表示。如果在 DBMS_LOGMNR.START_LOGMNR 步骤中不指定字典文件,则得到的 Redo SQL 将会类似于下面这样:

update "UNKNOWN"."OBJ# 1456235" set "COL 64" = HEXTORAW('3330383339') where "COL 1" = HEXTORAW('313330323230313038333835383436') and ...... and ROWID = 'AAFjhrAA6AANcSoAAF';

Oracle 提供了三种访问字典文件的方式:直接使用 online catalog,将字典文件导出至 Redo Log,将字典文件导出至 Flat File(即单独的字典文件)。由于我们选择在单独的 Oracle 实例中抽数,无法获取 online catalog,故只能选择第二种或第三种方式。尽管 Oracle 官方不推荐第三种方式(所以 Debezium 中并未内置支持),但该方式的效率实测比第二种高出不少(经笔者实测,采用第二种方法会将数据延迟由几秒钟放大至 3-5 分钟),且在我们的生产实践中并未观察到 Oracle 官方声称的不一致现象(因为生产系统鲜少有对存量表和字段的删除、插入等 DDL 操作),故推荐将字典文件存储于 Flat File。

五、提升 Debezium 框架抽数速度

按照上文的原理和基本步骤,笔者对 Debezium 框架进行了修改,成功实现了对 Oracle ADG 备库的数据抽取。但又出现了新的问题:

形成该问题的原因有二:

l LogMiner 的设计初衷为对 Redo logs 的诊断工具而非专门的 CDC 工具,故对抽取效率并未做优化。

l Debezium 框架仅考虑到功能的正确实现,并未考虑到对抽取效率的调优。

据此,笔者通过排查程序运行堵点,采用分发器模式对源代码进行重构,并对部分逻辑进行优化,最终使大事务时的解析速度从原先的 60 条/秒提升至 3000 条/秒以上。

1. Debezium 解析 V$LOGMNR_CONTENTS 的过程

V$LOGMNR_CONTENTS 中记录了按照 SCN 顺序排列的变化信息。需要注意的是,对于 DML 类操作,变化信息是基于行的(譬如一个 UPDATE 语句更新了 100 条记录,那么 LGWR 进程会实际上往 Redo Log 中写入 100 条 UPDATE 语句,每条语句仅更新一行,V$LOGMNR_CONTENTS 中也会实际上存储 100 条 DML 记录)。

Debezium 在调用完毕 DBMS_LOGMNR.START_LOGMNR 后,依次进行如下操作:(详见 io.debezium.connector.oracle.logminer 包下 LogMinerStreamingChangeEventSource 和 LogMinerQueryResultProcessor 类):

1)读取 V$LOGMNR_CONTENTS 并将结果记于 ResultSet。

2)若 ResultSet 中仍有内容,读取新的记录并判断类型。

3)如果是 DML 操作,那么将操作内容暂存于 Transaction 中;如果是 COMMIT 操作,那么将 Transaction 暂存内容组装为 Kafka Connect 支持的格式并发送;如果是 ROLLBACK 操作,那么将 Transaction 暂存内容丢弃。

4)跳回第 2)步继续执行,直到 ResultSet 全部内容读取结束。

需要注意的是,在 Debezium 中,以上过程全部为主线程串行执行。这种方式具有如下问题:

2. 架构优化

根据上节分析,笔者采用分发器模式,对 Debezium 整体框架进行了优化,下图为架构设计:

对该架构说明如下:

1)Oracle 对 LogMiner 做了非常严格的资源限制,每个 Logminer 进程资源消耗都不能超过 1 个 CPU 核心。因而当我们开启多个 LogMiner 进程同时挖掘时,可以观察到挖掘速度会有明显提升。但为了保证开启多进程后不影响 Redo 信息的处理顺序,我们需要维护一个 SCN 队列,代表每个 START_LOGMNR 操作的起始 SCN。Redo 信息分发器需要按照该顺序进行 Redo 信息分发。

每个 LogMiner 进程需要将查询 V$LOGMNR_CONTENTS 结果进行存储的原因,一是查询该视图的时间开销较大,不能直接让 Redo 信息分发器查询;二是暂存后 LogMiner 进程可以继续向后挖掘,不需要等待 Redo 信息分发器的后续处理。

2)Redo 信息分发器按照 SCN 顺序,对每一条 Redo 信息,拿到 Transaction ID 后直接将其派发到 Transaction 处理线程池的对应 Transaction 线程。特别地,如果这条 Redo 信息操作类型为 COMMIT,那么将对应 Transaction 线程的 commitSCN 置为该条信息的 SCN,并且将该 SCN 加入 CommitSCN 队列;如果这条 Redo 信息操作类型为 ROLLBACK,那么将对应 Transaction 线程的 rollbackSignal 置为 1。

每个 Transaction 线程在处理信息时,如果 rollbackSignal 为 1,那么直接结束处理过程;如果 CommitSCN 不为空且等于 CommitSCN 队头元素,则将当前解析完毕的信息直接发送至 Kafka Connect,否则通过对象流方式暂存,稍后再实际发送。

3. 需要关注的问题(一)——通过对象流暂存解析后 Redo 信息

Debezium 截至目前最新版本,均在内存中暂存操作类型为 DML 的 Redo 解析信息(具体可见 Io.debezium.connector.oracle.logminer.TransactionalBuffer 类中的内部类 Transaction,其使用 List<DmlEvent>直接存储解析后 DML 信息)。对于尚未提交的大事务,直接将 DML 信息存储于内存中极易发生内存溢出(笔者实测当一个 update 语句更新 30 万条记录时即可让开辟 2G 内存的 Debezium 进程因内存溢出而崩溃)。

解决方式是使用对象流方式,将待提交信息序列化存储于文件中。需要注意:

4. 需要关注的问题(二)——修正 Debezium 中过于繁冗的解析

Io.debezium.relational.RelationalChangeRecordEmitter 类中的 emitCreateRecord/emitReadRecord/emitUpdateRecord/emitDeleteRecord 中,均执行了 tableSchema.valueFromColumnData 方法。在笔者的排查中,该方法是除 LogMiner 本身执行效率外,实际发送至 Kafka 的速度过慢的最大性能瓶颈。

该方法所做的是将 LogMiner 解析后的记录转换为 Kafka Connect 能够识别的记录。通过阅读 LogMiner 的解析器 io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser 源码可知,该解析器事实上将所有字段都转换成了 String 类型。由于 String 类型一定可被 Kafka Connect 正确识别,笔者选择了在 RelationalChangeRecordEmitter 的子类 io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter 中,对相关的方法进行重写,省去调用 tableSchema.valueFromColumnData 的过程,直接将已解析的字段传递给 Kafka Connect。

六、结束语

本文阐述了针对 Debezium 做适应性优化和改造的方法,使其能够对主库无影响的情况下针对 Oracle 备库进行抽数,并将抽取速度提升至生产实际可用的程度。对于一般应用系统可作为替代 Oracle OGG 的“免费”且“好用”的方案选择。

作者介绍:

丁杨,现任职于中国农业银行研发中心,在大数据领域具有多年研发经验,现主要负责流批一体化计算在银行业务领域的应用研究。

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。