拓展Spark源码实现高性能Join 第四范式OpenMLDB (拓展speed)

拓展Spark源码实现高性能Join 第四范式OpenMLDB (拓展speed)

通过对前面 JoinType 和三种 Join 物理节点的修改,用户就可以像其他内置 join type 一样,使用 SQL 或者 top="9487"> LastJoin 实现性能对比

那么既然实现的新的 Join 算法,我们就对比前面两种方案的性能吧,前面直接基于最新的 Spark 3.0 开源版,不修改 Spark 优化器的情况下对于小数据会使用 broadcast join 进行性能优化,后者直接使用修改 Spark 源码编译后的版本,在小数据下 Spark 也会优化成 broadcast join 实现。

首先是测试 join condiction 能拼接多行的情况,对于 LeftOuterJoin 由于能拼接多行,因此第一个阶段使用 LeftOuterJoin 输出的表会大很多,第二阶段 dropDuplication 也会更耗时,而 LastJoin 因为在 shuffle 时拼接到单行就返回了,因此不会因为拼接多行导致性能下降。

从结果上看性能差异也很明显,由于右表数据量都比较小,因此这三组数据 Spark 都会优化成 broadcast join 的实现,由于 LeftOuterJoin 会拼接多行,因此性能就比新的 LastJoin 慢很多,当数据量增大时 LeftOuterJoin 拼接的结果表数据量更加爆炸,性能成指数级下降,与 LastJoin 有数十倍到数百倍的差异,最后还可能因为 OOM 导致失败,而 LastJoin 不会因为数据量增大有明显的性能下降。

右表能拼接多行对 LeftOuterJoin + dropDupilicated 方案多少有些不公平,因此我们新增一个测试场景,拼接时保证左表只可能与右表的一行拼接成功,这样无论是 LeftOuterJoin 还是 LastJoin 结果都是一模一样的,这种场景下性能对比更有意义。

从结果上看性能差异已经没有那么明显了,但 LastJoin 还是会比前者方案快接近一倍,前面两组右表数据量比较小被 Spark 优化成 broadcast join 实现,最后一组没有优化会使用 sorge merge join 实现。从 BroadcastHashJoin 和 SortMergeJoin 最终生成的代码可以看到,如果右表只有一行拼接成功的话,LeftOuterJoin 和 LastJoin 的实现逻辑基本是一模一样的,那么性能差异主要在于前者方案还需要进行一次 dropDuplicated 计算,这个 stage 虽然计算复杂度不高但在小数据规模下耗时占比还是比较大,无论是哪种测试方案在这种特殊的拼表场景下修改 Spark 源码还是性能最优的实现方案。

技术总结

最后简单总结下,OpenMLDB 项目通过理解和修改 Spark 源码,可以根据业务场景来实现新的拼表算法逻辑,从性能上看比使用原生 Spark 接口实现性能可以有巨大的提升。Spark 源码涉及 SQL 语法解析、Catalyst 逻辑计划优化、JIT 代码动态编译等,拥有这些基础后可以对 Spark 功能和性能进行更底层的拓展。

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。