Spark在GrowingIO数据无埋点全量采集场景下的实践 (spark在windows下的环境搭建)

Spark在GrowingIO数据无埋点全量采集场景下的实践 (spark在windows下的环境搭建)

大家好,我是来自 GrowingIO 的数据工程师付旗,今天跟大家分享的是我们 GrowingIO 在使用 Spark 中的经验,遇见的一些问题,以及我们修复的方法。在来之前,昨天晚上我在跟我们组同事讨论的时候,我说我发给他们简要写的我是 GrowingIO 的大数据工程师,我们组的几个工程师都非常的不屑,说现在大数据已经烂大街了,所以他们对外自称数据工程师,不叫大数据工程师。

GrowingIO 业务功能背后的考验

事实上,我们是一个刚成立一年的公司,今年 5 月份刚过了一周岁的生日,是一个标准的创业公司,但是到目前为止我们每天处理的数据量已经超过了几百亿条,所以在我看来是一个标准的大数据公司。

每天处理这么多数据的话,我们当然会遇到很多问题,今天主要的内容就是首先介绍我们公司,我们的业务模型和我们遇到的一些问题。第二部分我们会根据这些问题讲一下我们是如何思考和选择搭建我们自己的大数据平台。最后一部分,当我们平台搭建完之后,会遇到一些问题,我们会想办法去优化在 Spark 使用过程中的这些问题。

第一部分就是我们公司介绍,我们是去年刚成立的一家公司,是做数据分析的,我们跟之前的数据公司不一样就是我们提供的是全量采集的数据,不需要埋点,只要接入我们 SDK 之后,你就立刻能够获得你想要的数据分析的结果。我们现在提供 iOS、安卓、Web、H5 的 SDK。

我们采用的是全量采集的方案,所以不需要提前埋点,就是说所有的浏览、访问、点击所有的行为都会被采集到,这个数据量是非常大的,而且我们提供按需筛选数据的功能,如果你想要知道某一个按键的点击量,我们会立刻帮你筛选出来。还有我们支持随时回溯任意一个事件。我们也支持不同的图表功能、不同的维度、相互的拖拽和组合,我们提供 40 多种不同的维度。

提供这么多功能的话对我们平台的压力是非常大的。首先就是我们的数据处理压力非常大,我们每天要处理好几百亿条数据,我们的数据量变化也非常快,有些应用可能周五是高峰期,有些应用是周末的高峰期,所以我们的数据有一个波峰和波谷的概念。我们支持任意元素的实时查询,以及支持多维度的组合,所以需要整个数据平台有很强的伸缩性。我们还要支持海量数据的高速索引,不能让用户等待时间过长,同时我们要支持多维数据的实时查询。

GrowingIO 数据平台搭建

为了支持这么多功能,我们怎么搭建我们的数据平台的呢?

先看一下我们数据处理的主要步骤,首先是我们 SDK 采集数据,采集数据之后,首先把它扔到我们的消息队列里做一个基础的持久化,之后我们会有两部分,一部分是实时统计,一部分是离线统计,这两部分统计完之后会把统计结果存下来,然后提供给我们的查询服务,最后是我们外部展示界面。我们的数据平台主要基于中间的四个绿色的部分。

关于要求,对 消息队列 来说肯定是吞吐量一定要大,要非常好的扩展性,如果有一个消息的波峰的话要随时能够扩展,因为所有的东西都是分布式的,所以要保证节点故障不会影响我们正常的业务。

我们的 实时计算 目前采用的是分钟级别的实时,没有精确到秒级, 离线计算 需要计算速度非常快,这两部分我们当初在考虑的时候就选用了 Spark,因为 Spark 本身既支持实时,又支持离线,而且相对于其他的实时的方案来说,像 Flink 或者是 Storm 和 Samza 来说,我们不需要到秒级的这种实时,我们需要的是吞吐量,所以我们选择 Spark。实时部分用的是 Spark streaming,离线部分用的是 Spark offline 的方案。

查询方案 因为我们要支持多个维度的组合排序,所以我们希望支持 sql,这样的话各种组合排序就可以转化成 sql 的 group 和 order 操作。

消息队列 – Kafka

消息队列我们选择的是 Kafka,因为在我们看来,Kafka 目前是最 成熟 的分布式消息队列方案,而且它的 性能 、扩展性也非常好,而且支持容错方案,你可以通过设置冗余来保证数据的完整性。 Kafka 目前得到了所有主流流式计算 框架的支持 ,像 Spark, Flink, Storm, Samza 等等;另外一个就是我们公司的几个创始人都来自于 LinkedIn,他们之前在 LinkedIn 的时候就已经用过 Kafka,对 Kafka 非常熟,所以我们选择了 Kafka。

消息时序 – HBase

但选定 Kafka 之后我们发现了一个问题就是消息时序的问题。首先我们的数据采集 程中,因为不同的用户网络带宽不一样, 数据可能是有延迟 的,晚到的消息反而可能更早发生,而且 Kafka 不同的 partition 之间是不保证时序的。

但是我们所有的离线统计程序都是需要按时间统计的,所以我们就需要一个支持时序的数据库帮我们把数据排好序,这里我们选了 HBase。我们用消息产生的时间加上我们生成消息的 ID 做成它唯一的 row key,进行排序和索引。

SQL On HBase – Apache Phoenix

对于 sql 的方案来说,我们选择的是 Phoenix。选 Phoenix 是因为我们考虑了目前几个 SQL On HBase 的方案,我们发现 Phoenix 的效率非常好,是因为它充分的利用了 HBase coprocessor 的特性,在 server 端进行了大量的计算,所以大量 减轻了 client 的数据压力还有计算压力

还有就是它 支持 HBase 的 Column Family 概念 ,比如说我们要支持 40 个纬度的时候我们会有一张大宽表,如果我们把所有的列都设置一个列族的话,在查询任意一个列的时候都需要把 40 列的数据都读出来,这样是得不偿失的,所以 Phoenix 支持 Column Family 的话,我们就可以把不同的列根据它们的相关性分成几个列族,查询的时候可能只会命中一个到两个列族,这样大大减少了读取量。

Phoenix 还 支持 Spark 的 alt="">

因为 Spark 最近非常的火,所以它的社区资源非常的多,基本上所有主流的框架,像我们常见的 Phoenix,Cassandra, MongoDB 都有 Spark alt="">

首先我们最下面是三个 SDK,JS、安卓和 iOS,采集完数据之后会发到我们的负载均衡器,我们的负载均衡器用的是 AWS,它会自动把我们这些数据发到我们的 server 端,server 在收集完数据之后会进行一个初步的清洗,把那些不规律的数据给清洗掉,然后再把那些数据发到 Kafka 里,后面就进入到我们的实时和离线过程。

最终我们的数据会统计到 HBase 里面,对外暴露的是一个 sql 的接口,可以通过各种 sql 的组合去查询所需要的统计数据。目前我们用的主要版本,Spark 用的还是 1.5.1,我们自己根据我们自己的业务需求打了一些定制的 patch,Hadoop 用的还是 2.5.2,HBase 是 0.98,Phoenix 是 4.7.0,我们修复了一些小的 bug,以及加了一些自己的特性,打了自己的 patch。

Spark 实践与优化

第三部分讲一下我们在使用这个数据平台的过程中的一些实践和优化的地方,因为搭建完平台之后这种东西不是一蹴而就的,分布式的方案会有很多的问题,现在开源的东西进化的很快,新的东西出来之后可能会有很多 bug,包括我们当时在用 alt="">

从 Kafka 之后,分成两块,一块是秒级的 Spark Streaming,大概在 10 秒 -20 秒的一个 batch,然后把这些数据进行初步的清洗,把一些重要的数据存到 HBase 里面,然后提交 Spark 任务做计算。

还有一部分会把全量的数据存到 HDFS 里,但是存 HDFS 会有一个问题,就是如果你 batch 时间过短的话会产生大量的碎文件,我们的想法就是把 Spark Streaming 的 batch 时间设长,10 分钟一个 batch,这样的话会大量减少我们写入 HDFS 的文件数量。

同时在 Spark Streaming 里面,我们借助了 Redis 和 Postgres 的一些存储的方案。比如说在 Redis 里,我们会进行一些简单的计数,或者存一些相应关联的信息。然后在 Postgres 里,我们存了大量用户自定义的规则和属性,Spark Streaming 处理过程中会去读 Redis 或者是关联对应的 Postgres。

我们离线任务使用我们自己定制的 Spark Server,我们写好对应的业务逻辑,然后提交任务给 Spark Server。Spark Server 执行这些任务,从 HBase 和 Hive 表里通过>优先使用 Spark SQL &>Spark Server 设计

下面讲一下 Spark Server 在设计过程中的一些思考。我们的 Spark Server 设计的首要的目标就是共享一个 Spark Context,从而 共享 Spark 资源

之所以这样是因为如果用 Spark Submit 来提交任务的话,我们会遇到一个问题,就是任务如果执行的时间比较长,它可能会有一两个 task 执行的非常慢,导致整个 资源无法释放 。比如你申请了 100 核,但你可能 98 个任务都执行完了,剩下 2 个任务卡在那个地方会导致 100 个核都无法释放。

还有一个就是我们现在会跑很多临时的小任务 ,每个小任务时间可能只有十几秒,如果单独为这些 小任务去申请资源 的话,可能申请资源都得 30 秒或者 1 分钟。

还有一个就是 没有法正确的预估我们所需要的资源 ,因为我们不同的任务有不同的任务模型,读的数据量是有差距的,所以计算的成本也是不一样的。我们数据量是有波峰波谷的,所以更加难以预估到某一个时间点,每个任务需要多少个核。

所以在这个情况下,我们共享一个 Spark Context 就可以给这个 Spark 申请更多的资源,这样其他的人来共享这个资源。

同时,在使用同一个 Hive Context 的时候会遇到一个问题,像长时间运行任务和一些比较快速的任务,它可能所需要的配置是不一样的,比如说你要读一个 100G 的任务,可能它需要的 shuffle 数量是 1000,但比如说只 1G 的数据,可能 10 个 shuffle 就够了,所以我们支持使用不同的 Hive Context,给不同的 Context 设置不同的参数,这样的话你在运行的时候可以自己指定你需要哪个 Hive Context。

同时我们使用 Fair Scheduler 能够保证不同的任务同时共享资源。

我们给任务设定了一个优先级,每个任务提交的时候指定自己的优先级,我们根据任务的优先级与它创建的时间来选择执行顺序,我们同样支持非常重要的任务临时插队的方案,就是说在创建完之后立刻执行。

为了支持上面这个 优先级和任务插队 的这个方案,我们使用了 Postgres 作为任务的持久化。

它有很多好处,比如说 支持重跑 ,如果任务有问题,最终跑下来的数字是不对的,就可以在 Postgres 里把这个任务重新提交一下。

同时在任务执行的时候,如果遇到了 Exception 我们会把对应的消息和堆栈信息存到 Postgres 表里,这样就可以知道每个任务 失败的原因 ,什么时候失败的,在哪一行失败的。

因为是放在 Postgres 里,所以我们支持 手动修改优先级 ,这样的话遇到一些紧急的情况我们可以优先启动一些比较紧急的任务。我们现在会定时的统计一下我们每个任务创建时间、被调起时间,以及最后完成的时间,来找到每个任务执行平均时间以及不同任务的瓶颈,从而找到优化点。

我们的任务逻辑是同样一个任务,可能是按时间来跑,比如说一个小时统计一次,这样的话同一个任务是有时间概念的。为了支持这种时间概念,我们就需要 支持任务以单例模式运行 ,因为不同的任务不同时间段它的缓存表还有输出路径会相互影响的,以及任务之间可能是有依赖的,比如说九点到十点,可能依赖于八点到九点的结果,这样的话就需要这个任务以单例形式运行,而不是并行的运行。

为了避免任务之间的相互影响,我们为每个 Executor 配一个核,因为我们之前遇到一个问题, Spark 支持每个 Executor 配多核,但是问题是,可能在这一个 Executor 上同时跑了不同的几个任务,这几种之间是相互影响的,如果这个 Executor 在跑一个很长的任务,它跑了一半的时候突然这边加了一个小任务,这个小任务可能会极大的影响、拖慢速度。第二它可能会把整个 Executor 卡死,这种情况在 Spark 中还是很常见的,比如数据有倾斜,内存溢出等等,可能会把整个 Executor 卡死,这就会导致这个 Executor 所有跑的任务都会挂掉,会影响那种长时间运行任务的效率。所以我们目前为每一个 Executor 只配了一个核。

我们还使用了 Spark REST API 来 监控任务 跑的时间,自动杀掉时间过长的任务,这种时间过长的任务有很多原因,比如说是数据倾斜,这是 Spark 任务中一个很常见的情况,还有可能就是代码逻辑有问题,或者说数据量有激增,这些都需要在杀掉之后去分析原因来进行优化。

遇到的问题

我们在使用 Spark 的过程中还遇到一些问题。这些问题可能有一些人也遇到过,我们可以分享一下我们在处理这些问题过程中的一些想法。

比如说我们遇到的 Kafka 重复消费的问题,对于我们来说 Kafka 的消费的目标不是用 Exactly Once。当我们遇到一些问题一些统计逻辑或者一些数据晚到的问题的时候,我们需要从某一个时间点重新回追所有的数据,这样的话 Kafka 就要支持重复消费。

第二个就是当业务逻辑越来越多的时候, Spark Streaming 就很难扩展。

第三、同时运行任务过多的时候,Spark Server 的任务调度就会变慢,因为目前我们一天要跑一万多个任务,Spark Server 同时运行的任务可能在 100-200,当同时运行任务过多的时候,Spark Server 就会变慢。

第四、遇到一个小 bug,Spark 在写 Hive 表的时候它会先把数据写到一个临时目录里,通过挪文件的方式把所有的数据挪回 Hive 表里,但是它会留下大量的临时目录没有清。

还有一个就是 Phoenix 在遇到多表 union 的时候速度变慢,以及逐条写 HBase 的方案会比较慢。

最后就是 count distinct 的数量太大,count distinct 是 sql 的语法,它的问题是当数量过多的时候,就会在 client 端造成很大的压力,比如说对于我们来说,就是 count distinct user,user 就是一个用户,我们需要知道昨天有 10 万人,今天有 10 万人,这两个 10 万人之间可能有 5 万人重复,我们要知道这两个 10 万人加起来一共是多少 distinct 的数量,所以就需要把所有的 20 万人都拿到 client 端进行过滤。假如 10 万级别的话可能还没有问题,但当我们的用户量达到百万、千万级别的时候就会大大的影响查询的性能。

问题 1:Kafka 消息重复消费

首先 Kafka 重复消费的问题。

我们目前使用的是 Spark 的 Kafka Direct API,Direct API 不维护 offset,所以我们选择在 zookeeper 自己维护 ,这样的话如果想重复消费的时候我们只需要找到对应时间点的 offset,然后从这个 offset 重跑就可以了。

目前有很多其他的公司,会定期的维护一个 offset 到时间的映射,然后当他们需要找到某个时间的时候他到这个映射表里去反查。我们的消息虽然不是严格时序的,但它都是接近时序的,可能两个消息之间最多差一二十秒,所以我们可能通过 近似二分法 来从 Kafka 里找到对应的时间点的 offset。

同时我们使用 Kafka 的 partition 加 offset 作为这个消息的 唯一 ID ,这样的话在生成一个消息的时候,我们的 ID 就不会重复,即使你重跑很多次,HBase 会自动把它去重。

问题 2:Streaming 业务逻辑较多,难以维护

第二就是我们的业务逻辑过多,我们把 DStream 全部映射成 alt="">

第二个 Operation 比如说我们需要知道北京的用户是多少,就在这个 Pipeline 里面加一个 Filter,城市等于北京。之后我们可能需要把分析出来的数据给存下来,比如说我们要存到北京的这张用户表里,我们同时要存到中国这张用户表里,所以它的输出可能是多个输出,为了解决这种多个输出问题我们会自动在 Pipeline 分支的地方加上 persist 和 unpersist 的过程,减少重复计算。

同时我们支持把 Pipeline 定义成一个可插拔的配置文件,现在要开发一个新的业务逻辑的话首先就要重写一个 Operation 类,定义你的操作类型,输入和输出,大部分可能是里面写了 udf,之后要把 Opeartion 写到你认为正确的流程定义里面,这样的话 Spark Streaming 就会从正确的流程开始,并且继承之前的逻辑。

问题 3:同时运行任务过多,任务调度变慢

同时运行过多的情况,我们第一个反应就是 增加 driver 内存 ,使用更好性能的机器,但发现这个会有一点点提升,但是提升不大,后来我们发现 Spark 在在查找 cache>问题 4:Spark 写 Hive 表时,临时目录未清理

Spark 写 Hive 表的时候临时目录没有清理,我们就修改了一下,加了两行代码,在结束之后把这种临时目录直接删掉就可以了,这是一个很小的修改。

问题 5:Phoenix 查询遇到多表 union 时速度过慢

还有就是 Phoenix union 的问题,比如说我要查三月份、四月份、五月份数据的时候,我其实就需要这三个 union,但 Phoenix 在 union 的时候有一个问题,就是说它是串行执行的,它需要先去查三月份的数据,结束之后查四月份的数据,结束之后再查五月份的数据。

这样的话就很慢,所以我们改成并行,这样它会同时执行三四五月的数据,然后把所有的数据并行的拿回来。这样会大大加快我们 union 的速度。

问题 6:逐条写 HBase 太慢

下一个问题就是写 HBase 太慢的问题。因为我们一天要写几千万行、上亿上数据,HBase 带宽有很大压力,另外要重复的进行 compact。HBase 提供了 Bulkload 的方法,但它提供的是 MapReduce 的版本,Bulkload 的方法就是把所需要的数据提前整理好,整理成 HFile,然后一次性通过挪文件的方式挪到 HBase 里面,这样就大大减轻了 HBase 的压力。

我们基于 MapReduce 版的方法写了 Spark 版的。归功于 Spark 的优化能力,我们发现 Spark 版的会比 MapReduce 要快五倍,大大减少了 HBase 的压力。

同时我们会把那些需要重复计算的数据临时存在 HDFS 上,比如说今天统计了一点、两点、三点、四点的数据,但可能晚上统计的时候需要一点到二十四点的数据全部统计下来,所以我们不需要反复的去读 HBase,因为 HBase 单条查是很快的,但如果你要查大量的数据的话,压力非常大,所以我们就会把这些需要重复利用的数据存入临时表,这样的话可以直接从临时表里出所需要的统计数据。

问题 7:count distinct 数量太大

还有 count distinct 过多的问题,有两种方案。

一种方案是 BitMap 的方式 。我们可以把每个人进行一个编号,分配一个唯一的 ID,把所有出现的 ID 存成一个 BitMap,这样的话就可以把一个人压成一个 bit。这样 count distinct 操作就能转换成对应的 BitMap 的操作。

比如说昨天和今天的人,其实就是两个 BitMap 或操作,再比如查昨天并且是北京的用户,那就需要把昨天的用户拿出来,把北京的用户拿出来,做一下 BitMap 与操作,这样就出了昨天北京的用户。同时,当出现人特别稀疏的时候,比如说很多人昨天来了,今天没来,今天出现了大量的空位。因为 ID 分配好了,所以我们采用了压缩的方法,就是把这些不需要的零或者一的位置全部压缩一下。

还有一种方案是 ****Hyperloglog 的方案 。它的优点就是说不需要编号,占用空间也小,像 Redis 的话也提供 Hyperloglog 的方法,一个 Hyperloglog 可能就需要十几 K 的存储空间,它的误差跟你所使用的寄存器的数量有关,Redis 中误差率小于 1%。

但它不是一个精确的方案,它还有一个缺点就是说它只能做并集不能做交集,你可以把昨天的用户和今天的用户拿出来,merge 一下就是这两天的总数,但你不能把昨天的用户和北京的用户做一下交集的操作。像 Spark 的话它在 2.0 的时候,它的 window 操作里面的也会启用这种方案。

我们目前离线计算会用 BitMap 的方法,它的优点是很准确,支持各种操作,我们实时计算用的是 Hyperloglog 方案,虽然会有误差,但实时计算误差小于 1% 的话其实是可以接受的。

谢谢大家,这就是我今天分享的内容。


感谢杜小芳对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(,@丁晓昀),微信(微信号:InfoQChina)关注我们。

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。