实时计算平台,是贝壳内部统一承接实时需求和管理实时任务的平台。目前支持了公司埋点、商机、交易、商业化等若干部门的业务,目前总共运行了 570 多个实时计算任务,日志流量单日吞吐 1041 亿。
随着实时数仓等业务的推进,越来越多的任务接入到我们平台,使我们开发和运维实时任务的成本急剧升高。于是我们迫切希望能有一种快速开发和容易维护的方法,最终我们把希望的目光投向了 SQL。
我们都知道,SQL 作为一种通用的描述数据处理的语言,有着统一的标准和广泛的使用。它的学习成本低,开发效率高,行业内有着完整的生态,成熟的优化规则。
但是,SQL 其实更多的是在线上系统的、面向关系数据库的 OLTP 场景和离线数仓的 OLAP 场景中使用。那么能否将 SQL 应用到实时计算的场景来提升实时任务的开发效率呢?
带着这个问题,我们开始了 SQL on Streaming 的挑战。
1. SQL on Streaming
1.1 问题
我们都知道,SQL 是作用在有限数据集上的,查询引擎读取完整可用的数据集,生成固定大小的结果。
相比之下,数据流持续地提供新的记录,数据随着时间到达,因此流查询必须持续的处理到达的数据。所以问题变成了: 如何在流上定义一个能够使 SQL 作用在其上的视图 。
1.2 动态表
动态表概念的提出,用来支持对结果更新的查询,保留流和批输入的统一语义。动态表是一个不断更新的表,可以像常规的表一样进行查询。
但是,对动态表的查询是连续运行的,而且根据输入表的不断更新,查询出来的结果表也是不断更新的,因此结果表也是一个动态表。
因此,我们可以认为: 流和动态表其实是同一种东西,只是在不同角度看待问题而产生的不同概念罢了,二者其实是可以相互转换的 。我们可以将流转换为动态表,也可以将动态表转换为流。
下图显示了可以在流上处理关系查询的概念模型:
那么我们如何在流上定义一个动态表?
有两种模式:追加和更新。
追加模式下 ,流中的每条新增记录都会插入到动态表中,不会对动态表中的历史数据做修改。如下图所示:
而 更新模式 下,流中的每条记录都会根据其唯一的健值或新增,或对历史的数据做更新操作。如下图所示:
1.3 持续查询
由于动态表随时间而变,我们必须定义查询动态表的意义。
假设我们有一个动态表 A,那么我们可以定义:
它在时间 t 时刻的快照为 A[t],这时就可以使用任意的 SQL 来对其进行查询,生成一个结果表。
定义查询的操作为 q,将查询作用到 A[t]上,我们表示为 q(A[t]),生成的结果表为 R[t]。
于是我们有:
R[t] = q(A[t])
复制代码
在动态表上的持续查询,表示为在 t 的每个时间点,都会执行一个 t 时刻在动态表 A 上的批量查询,而所生成的结果表也同样是一个持续更新的动态表。它可能会不断更新先前的行,也可能添加新行。
好了,理论部分说完了,那么我们如何将 SQL on Streaming 有效的落地呢?
在当时,平台上的所有任务都还是在使用 spark 技术栈,而且当时 Spark 新推出了 Structured Streaming 模块。因此,我们自然而然的将该模块作为了我们将 SQL on Streaming 落地的第一阶段的解决方案。
我们当时调研的版本是 2.3.1,Structured Streaming 刚刚推出不久,其虽然支持一些比较复杂的窗口聚合操作,但是限制也比较多。其仅在 api 层面上提供较为完善的语义,在 SQL 层面并没有提供较好的支持。
当时就我们平台上的任务来说,也没有在流维关联、窗口聚合这方面有比较强烈的需求,我们最初对于产品的定位也仅仅是做一些简单的 ETL 工作。
于是,最初的设计如下:
2.1 产品展示
下图是我们使用 SQL 1.0 配置出的一个对业务后端日志做简单 ETL 的任务。可以看到这里面有五个节点:
上游 source_kafka_1 是读取外部 kafka 中的后端日志;
中间有三个节点,是处理过程;
最后绿色的 sink_kafka_1 是将处理后的数据输出到下游 kafka 中的 topic。
下图是对输入端 kafka 的配置:
里面包含了要连接的 kafka 集群名称,订阅的 topic,读取数据的格式、字段和类型。对于 kafka 中的数据格式,我们提供了 4 种定义 schema 和解析的方式:
此外,在定义中间若干处理节点时,除要配置 SQL 语句外,还要指定上游依赖的节点:
最后,在配置 sink 组件时,我们要指定上游的数据依赖,输出到下游的 kafka 集群名称,topic,和输出数据格式,包括想要的字段名称,字段格式,提供了将 string 转换成 json object/array 的能力。
2.2 产品设计
下面是设计的组件类图:
目前平台上运行着的 SQL 1.0 的实时任务有 60 个,而且数量还在增加。
2.3 问题
2.3.1 checkpoint
我们知道,在定义 Structured Streaming 任务的时候必须指定 checkpoint 路径,而且 Structured Streaming 底层在做 checkpoint 的时候是同步的,也就是说会阻塞整体的处理进度。
当凌晨有大量离线任务运行导致 hdfs 繁忙的时候,会导致 checkpoint 阻塞,从而导致数据的积压和延迟,这对下游一些对实时性要求较高的系统来说是不可接受的。
2.3.2 offset
Structured Streaming 的消费进度是保存在 checkpoint 里的,而且每次任务重启都会直接从 checkpoint 里读取 offset,并且会在 driver 端屏蔽掉自动提交 offset 功能。
因此会有以下几个问题:
首先,我们对 SQL 2.0 的产品定位为提供一种以纯 SQL 的方式来定义流式作业。那么围绕这一宗旨,SQL 2.0 应该具备哪些能力呢?
在调研了多款开源产品之后,我们选择了 Flink 这款目前最火热的开源流处理框架。它对 SQL 的支持是目前所有实时处理框架中最好的。
所以,我们最终决定在 Flink-SQL 上做扩展,来实现我们的功能。
考虑到在引擎应该只提供能力,而和管理着元数据系统打通应该属于应用层的能力,因此我们初步设计是:希望能用一种 DDL 的方式来打通应用层和引擎层,同时用来定义外部数据源,而 Flink SQL 中暂还并未提供这种能力。
因此,我们基于 Antlr4 工具,对标准 SQL 中的 DDL 语法做了一些扩展,使其不但能够描述外部数据的格式,而且能够提供访问外部数据的一些连接信息,以及提取特定字段的能力。
示例建表语句如下:
--定义数据源表
create source table kafka_source_tb (
system_type string extraction '$.system_type',
client_os_type string extraction '$.client_os_type',
ucid string extraction '$.ucid',
ts bigint extraction '$.timestamp',
watermark key ts delay for 5s
type = 'kafka',
dataType = 'json',
brokers = 'off03-bigdata.mars.ljnode.com:9092',
topics = 'data-pipeline-common-dev',
groupId = 'test-group'
--定义维表
create dim table redis_dim_tb (
ucid string,
first_login_time string,
device_id string,
primary key ucid
type = 'redis',
server = '127.0.0.1',
port = '6379',
cache = 'all'
--定义输出表
create sink table console_sink_tb (
ucid string,
first_login_time string,
device_id string,
client_os_type string,
system_type string,
type = 'console',
dataType = 'json'
复制代码
其中:
3.2 维表关联
到目前为止,我们已经在 SQL 2.0 中提供了 1.0 中的全部能力,理论上已经能够提供给用户做简单的 ETL 使用了。
但是作为一个有理想的青年,我们并不满足于此。而且,为了后续提供构建实时数仓和实时指标的能力,简单的 ETL 是不够的,我们还要和很多的外部数据做关联。
为了满足这方面的需求,我们基于 calcite 的 SQL 解析功能和阿里为社区贡献的 Async I/O 功能,实现了维表关联的能力。
下面对这部分功能的设计做详细介绍。
3.2.1 设计
首先,思路就是先解析 SQL,如果在 join 语句后面出现了用户定义的维表,就触发维表关联的 SQL 改写逻辑,改写逻辑如下:
这样理解起来可能有些困难,我们在上面三个 DDL 的基础上举个例子,使用如下 DML 来定义一个作业:
insert into console_sink_tb (
ucid, first_login_time, device_id,
client_os_type, system_type)
ucid, first_login_time, device_id,
client_os_type, system_type
from kafka_source_tb a
join redis_dim_tb b
on a.ucid = b.ucid
复制代码
第一步 :由于本例中不存在嵌套查询,第一步可以跳过。
第二步 :合并流维关联节点,提取关联条件。
上述 SQL 经过 SQL 解析后,会生成如下语法树:
该语法树经过转换后,变为:
翻译成 SQL 后就是:
a_J_b.ucid,
a_J_b.first_login_time,
a_J_b.device_id,
a_J_b.client_os_type,
a_J_b.system_type
复制代码
第三步 :对流表的>
根据我们前面讲的概念,流和动态表是可以相互转换的,因此,我们可以先将流表转换成>
将关联后的数据流注册成中间表,表名即流维表节点合并的名字(a_J_b),这样就可以将转换后的 SQL 语句作用到该表上了。
第四步 :需要对原 SQL 语句的 where 子句或 group by 子句以 a.client_os_type 的方式中引用到字段的所属表名做替换,将 a 替换成 a_J_b。
因为当我们将流表关联维表合并为一个节点后,原来的 a 已经变成了一个不可识别的标识符了。
3.2.2 缓存优化
在生产环境中,绝大多数的流都是很快的,如果每条数据都要访问一次外部存储,那么,除了整体的性能会差以外,对外部存储也会造成很大压力,甚至会把外部系统压垮。
另一方面,考虑到其实很多维表变更并不频繁,而且数量也不会很大,那么,我们完全可以将维表的数据缓存在内存中,设定好过期策略做到同步更新。
对于开启了缓存的维表,内存中的缓存在任务刚启动时是空的,这会有一个预热的阶段。另外可对缓存设定两种过期策略,一种是缓存大小,一种是过期时间。
缓存大小策略是根据 LRU 算法进行数据淘汰,过期时间是根据最后读取数据的时间,当数据被缓存淘汰,程序会重新查询外存,并更新到缓存中,以此来实现缓存中数据的更新。
另一方面当数据量比较大时,单节点不足以将绝大多数维度数据缓存,可以预先根据与参与关联的维表主键对应的流表字段做预分区。
即根据某一个字段,保证该字段下同值的记录总被分配到同一个下游节点上,这样每个下游节点只缓存本节点能用到的数据,且能保证该部分的值域仅占总量的很小一部分。
3.2.3 关联条件支持表达式
可以看到上述方案虽然解决了流表和维表关联的问题,但是是有很多限制的。
比如说我们拿 hbase 作为维表来举例,就要求关联条件中必须包含 hbase 的 rowkey,而且 rowkey 必须作为关联条件的一部分,其值是必须能够直接从流表中取到的,也就是要求关联条件中只能是 EQUAL 类型的表达式,而且等号两边必须只能是对列的简单引用。
但是很多时候,hbase 的 rowkey 可能并不是一个单一含义的值,它也可能是一个业务逻辑上的联合主键,需要将多列拼接起来才能构成一个完整的 rowkey,这个时候关联条件的限制就成为了一个使用上的痛点。
用户当然可以通过定义临时视图的形式绕过在这个限制,但是这样又增加了用户的使用成本,所以我们也集中精力解决了这个问题。
我们看下面这个例子:
select a.col1, a.col2, b.coll
on a.col1 + a.col3 = b.col
复制代码
其实思路很简单,我们在关联维表前,需要将流表 a 转换成>
a 中仅包含 col1,col2,col3 字段,但是关联条件中需要 col1 + col3,所以我们将表 a 转换成临时视图__function_tmp_view__0,它表达的逻辑如下 SQL:
a.col1 + a.col3 AS __function_tmp_field__0
复制代码
这样关联条件就变成了:
__function_tmp_field__0 = b.col
相当于我们将表达式的值的计算过程移动到了我们自动添加的临时视图中,将计算过程提前了。这样我们在关联的时候就能够直接获取关联条件中所需要的值了。
3.3 维表的接入
实际上,业务方在有维表关联的需求时,很多都是希望能够直接关联业务库。
但是受限于集群访问业务库的安全和稳定性问题,我提供了一种额外的方式能够将用户的业务库数据通过 binlog 的方式来实时同步到 hbase 和 redis 中,而 SQL 引擎只需要去和 HBase 或 Redis 中的数据做关联就可以了。
目前我们已经支持了 HBase,Redis,Http,Phoenix 等方式的维表关联。
3.4 新的问题
以上就是我们在构建 Streaming SQL 2.0 的过程中遇到的一些问题和解决办法。
在 SQL 1.0 中存在的 checkpoint 和 offset 问题在 2.0 中被框架自身所消化,但也随之给我们带来了新的挑战:
这些问题都是需要我们去解决的。
4. 平台化建设
SQL 2.0 在 2019 年 8 月份在我们平台上上线,目前已经有 200 多个任务正在运行,覆盖了实时数仓,实时交易,商业化,租赁等业务部门;涉及了 ETL,维表关联,数据落地 clickhouse 等场景,而且任务数量增长很快。
4.1 产品展示
实时计算平台目前已经集成了数据源管理功能,用于管理实时领域的结构化数据元信息,用户可以预先配置和共享数据源,数据源可以自动拉取样例数据,生成 schema 信息。
在 SQL 2.0 任务配置过程中,用户可以选择已经存在的数据源,后端会自动生成自定义的 DDL。这样用户只需编写 DML 定义处理逻辑就可以了,无需编写复杂的 DDL,提升了用户的开发效率。最右侧是对任务中涉及的数据源的管理功能。
另外,我们也提供了 SQL 的语法校验功能,使用 antlr4 自定义语法文件解析和校验 DDL 语句,使用 calcite 解析和校验 DML 语句,能够做到在线验证 SQL 语法。
此外,我们也提供了查看执行计划,和在线 Debug 等功能,能够大大提升用户的开发效率和 debug 效率。
4.2 任务运维
目前贝壳的所有实时任务是统一托管在 Hermes 实时计算平台上的,而 SQL 2.0 在平台上只是一个特殊的场景任务。
整体的任务运维和监控报警由平台统一管控,能提供任务指标上报和监控,任务心跳监控,任务失败自动拉起等功能。
5. 挑战 && 规划
Streaming SQL 在实时计算领域有着广泛的应用场景,在数据分析,数据清洗,实时数仓和实时指标等方面都提供了非常重要的技术支撑。Streaming SQL 为下游提供了高效的开发手段和可靠、低延迟的实时处理能力。
在实践的过程中,我们经历了在 spark-structure-streaming 上构建的 SQL 1.0 版本,和之后基于 Flink-SQL 之上构建的 SQL 2.0 版本。这其中遇到过一些问题,包括 DDL 的设计和实现,维表关联,平台化和任务监控等。
最后,是我们对未来工作的一些思考和规划:
后续我们将为大家汇报贝壳 Streaming SQL 的新进展和新成果。敬请期待~
原文链接 :