在 NUVIAD,我们 3 年多以来一直将Amazon Redshift用作我们的主要数据仓库解决方案。
我们存储了大量广告交易数据,我们的用户和合作伙伴对这些数据进行分析,以确定广告活动策略。大规模运行实时竞标 (RTB) 活动时,数据的新鲜度至关重要,因为这样一来,我们的用户就可以对广告效果的变化做出快速反应。我们选择 Amazon Redshift 是因为它的简单性、可扩展性、性能及几乎实时加载新数据的能力。
在过去三年里,我们的客户群大大增加,数据亦是如此。我们发现 Amazon Redshift 集群从 3 个节点增长到 65 个节点。为了平衡成本和分析性能,我们找到了一种方法,以更低的成本存储大量不太频繁分析的数据。然而,我们仍然希望能够立即为用户查询提供数据及满足他们对性能显著提升的期望。我们转向了Amazon Redshift Spectrum。
在此博文中,我们解释了将带 Redshift Spectrum 的 Amazon Redshift 扩展为现代数据仓库的原因。我将介绍我们的数据增长及平衡成本和性能的需求如何促使我们采用 Redshift Spectrum。我还将分享我们的环境中的关键性能指标,并讨论提供可扩展和快速环境的额外 AWS 服务,并提供数据供我们日益增长的用户群进行立即查询。
将 Amazon Redshift 作为基础
能够为客户和合作伙伴提供最新数据一直是我们平台的主要目标。我们发现其他解决方案可提供几个小时前的数据,但这对我们来说还不够好。我们坚持提供最新的数据。对我们而言,这意味着以频繁的微批次加载 Amazon Redshift 并使我们的客户直接查询 Amazon Redshift 将近乎实时的获得结果。
优势是显而易见的。我们的客户可以看到,他们的活动比其他解决方案执行的更快,对不断变化的媒体供应定价和可用性的反应更迅速。他们非常高兴。
然而,这种方法需要 Amazon Redshift 长期存储大量数据,且我们的数据大幅增长。在高峰期时,我们维护了一个运行 65 个 DC1.large 节点的集群。对 Amazon Redshift 集群的影响是显而易见的,而且我们发现 CPU 利用率增长到 90%。
我们为什么将 Amazon Redshift 扩展到 Redshift Spectrum
Redshift Spectrum 使我们能够使用强大的 Amazon Redshift 查询引擎对 Amazon S3 中存储的数据运行 SQL 查询,无需加载数据。利用 Redshift Spectrum,我们可以以我们希望的成本江南数据存储在我们想要的地方。我们可提供数据供用户在需要时进行分析,且数据性能符合用户预期。
无缝可扩展性、高性能和无限并发数
扩展 Redshift Spectrum 是一个简单的过程。首先,它可使我们将 Amazon S3 用作存储引擎并获得近乎无限的数据容量。
其次,如果我们需要更多计算能力,我们可以在数千个节点上利用 Redshift Spectrum 的分布式计算引擎来提供卓越的性能——这对于针对大量数据运行的复杂查询非常适合。
第三,所有的 Redshift Spectrum 集群均可访问相同的数据目录,这样一来,我们不必再担心数据迁移,可以毫不费力进行无缝扩展。
最后,由于 Redshift Spectrum 将查询分布在可能有数千个的节点上,它们不受其他查询的影响,从而能提供更稳定的性能和无限的并发数。
将它保持为 SQL
Redshift Spectrum 使用与 Amazon Redshift 相同的查询引擎。这意味着,我们不需要更改我们的 BI 工具或查询语法,无论我们是在单个表上使用复杂查询还是加入多个表。
最近推出的一项有趣功能是,能够创建同时跨越 Amazon Redshift 和 Redshift Spectrum 外部表的视图。利用此功能,您可以使用一个视图在 Amazon Redshift 集群中查询频繁访问的数据并在 Amazon S3 中查询较少访问的数据。
利用 Parquet 以获得更高性能
Parquet 是一种柱状数据格式,可提供优越的能力并使 Redshift Spectrum(或 Amazon Athena)能够扫描更少的数据。查询使用较少的 I/O 便能更快运行,且每个查询的费用更低。
较低的成本
从成本角度来看,我们为 Amazon S3 中的数据支付标准费率,每个查询只需要少量数据来使用 Redshift Spectrum 分析数据。使用 Parquet 格式,我们可以大大减少扫描的数据量。现在,我们的成本更低,而我们的用户甚至在较大的复杂查询中都能获得快速结果。
我们了解的有关 Amazon Redshift 对 Redshift Spectrum 性能的内容
当我们第一次开始了解 Redshift Spectrum 时,我们希望对其进行测试。我们希望了解它与 Amazon Redshift 相比如何,所以我们考虑了两个关键问题:
在迁移期间,我们将 Amazon Redshift 和 S3 中存储的数据集设置为 CSV/GZIP 和 Parquet 文件格式。我们对三种配置进行了测试:
我们使用一个月的数据对简单和复杂查询执行了基准测试。我们测试了执行查询所需的时间,以及多次运行同一个查询时,结果的一致性如何。我们用于测试的数据已按照日期和小时进行了分区。对数据进行适当分区将大大提高性能并降低查询时间。
简单查询
首先,我们测试了聚合一个月账单数据的简单查询:
count(*) AS impressions,
SUM(billing)::decimal /1000000 AS billing
date >= '2017-08-01' AND
date <= '2017-08-31'
复制代码
对于简单查询,Amazon Redshift 的表现比 Redshift Spectrum 更好,正如我们所想,这是因为数据在 Amazon Redshift 本地。
令人惊讶的是,在 Redshift Spectrum 中使用 Parquet 数据格式明显优于 Amazon Redshift 的“传统”表现。对于我们的查询,使用 Parquet 数据格式与 Redshift Spectrum 产生的平均性能比传统的 Amazon Redshift 高 40%。此外,Redshift Spectrum 在执行时间上表现出很高的一致性,最慢运行和最快运行之间的差异较小。
由于我们只为 Redshift Spectrum 扫描的数据付款,使用 Parquet 显然可以大幅节省成本。
复杂的查询
使用 Parquet 的 Redshift Spectrum 与传统的 Amazon Redshift 相比,将平均查询时间削减了 80%!
底线:对于复杂查询,Redshift Spectrum 与 Amazon Redshift 相比,将性能提高 67%。使用 Parquet 数据格式后,Redshift Spectrum 与 Amazon Redshift 相比,使性能提高 80%。对我们而言,这一提高巨大。
为不同工作负载优化数据结构
由于 S3 的成本相对便宜且我们只需为每个查询扫描的数据付款,我们认为,对不同的工作负载和不同的分析引擎,以不同的格式保存数据是有意义的。需要注意的是,我们可以有任意数量的表指向 S3 上的相同数据。这完全取决于我们如何对数据分区及如何更新表分区。
数据排列
例如,我们有一个一分钟运行一次且为最后一分钟收集的数据生成统计数据的进程。利用 Amazon Redshift,将如下所示通过在表上运行查询来完成此操作:
events_table
ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’
复制代码
(假设 ‘ts’ 是为每个事件存储时间戳的列。)
利用 Redshift Spectrum 时,我们为每个查询中扫描的数据付费。如果数据按分钟而不是小时分区,则查看一分钟数据的查询费用为成本的 1/60。如果我们使用仅指向最后一分钟数据的临时表,我们将节省不必要的费用。
高效创建 Parquet 数据
平均而言,我们有 800 个实例来处理流量。每个实例发送最终加载到 Amazon Redshift 中的事件。从三年前开始,我们可以将数据从每个服务器卸载到 S3 中,然后再执行从 S3 到 Amazon Redshift 的定期复制命令。
最近,Amazon Kinesis Firehose 将卸载数据的功能直接添加到了 Amazon Redshift 中。虽然现在这是个可行选项,但我们保留了相同的收集过程,这个过程已完美、高效地工作了三年。
但当我们合并 Redshift Spectrum 时,情况发生了变化。利用 Redshift Spectrum,我们需要找到一种方法来:
为完成此操作,我们将数据另存为 CSV 格式,然后将其转换为 Parquet。生成 Parquet 文件的最有效方法是:
在此新过程中,我们必须在将数据发送到 Kinesis Firehose 之前更加注意验证数据,因为分区中的单个损坏记录无法对该分区进行查询。
数据验证
为了将我们的点击数据存储在表中,我们考虑了以下 SQL 创建表命令:
create external TABLE spectrum.blog_clicks (
user_id varchar(50),
campaign_id varchar(50),
os varchar(50),
ua varchar(255),
billing float
按 (date date, hour smallint) 分区
存储为 parquet
位置 's3://nuviad-temp/blog/clicks/';
复制代码
首先,我们需要获取表定义。此操作可以通过运行以下查询来实现:
svv_external_columns
tablename = 'blog_clicks';
复制代码
现在,我们可以使用此数据为我们的数据创建验证模式:
const rtb_request_schema = {
"name": "clicks",
"user_id": {
"type": "string",
"max_length": 100
"campaign_id": {
"type": "string",
"max_length": 50
"type": "string",
"max_length": 50
"type": "string",
"max_length": 255
"type": "integer",
"min_value": 0,
"max_value": 9999999999999
"billing": {
"type": "float",
"min_value": 0,
"max_value": 9999999999999
复制代码
接下来,我们创建一个函数,以使用此模式验证数据:
function valueIsValid(value, item_schema) {
if (schema.type == 'string') {
return (typeof value == 'string' && value.length <= schema.max_length);
else if (schema.type == 'integer') {
return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
else if (schema.type == 'float' || schema.type == 'double') {
return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
else if (schema.type == 'boolean') {
return typeof value == 'boolean';
else if (schema.type == 'timestamp') {
return (new Date(value)).getTime() > 0;
return true;
复制代码
使用 Kinesis Firehose 进行近实时的数据加载
在 Kinesis Firehose 上,我们创建了一个新的传输流以按如下所示处理事件:
传输流名称:事件
S3 存储桶:nuviad-events
S3 前缀:rtb/
IAM 角色:firehose_delivery_role_1
数据转换:已禁用
源记录备份:已禁用
S3 缓冲区大小 (MB):100
S3 缓冲区间隔(秒):60
S3 压缩:GZIP
S3 加密:未加密
状态:活动
错误记录:已启用
复制代码
此传输流每分钟聚合一次事件数据,或最多聚合 100 MB,并将数据作为 CSV/GZIP 压缩文件写入 S3 存储桶中。接下来,在我们验证数据后,我们可以将其安全发送至我们的 Kinesis Firehose API:
if (validated) {
let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line
let params = {
DeliveryStreamName: 'events',
Data: itemString
firehose.putRecord(params, function(err,>if (err) {
console.error(err, err.stack);
// 继续您的下一步
复制代码
使用 AWS Lambda 自动化数据分布
我们创建了一个由 S3 put 事件触发的简单 Lambda 函数,该事件将文件复制到另一个位置(或多个位置),同时对文件进行重新命名以适应我们的数据结构和处理流程。如前所述,Kinesis Firehose 生成的文件按照预先定义的层次构造结构,如:
S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
复制代码
我们需要做的就是解析对象名称,并按照我们认为合适的方式重新构造其结构。在我们的例子中,我们执行了以下操作(事件是 Lambda 函数中接收的对象,该对象的所有数据都写入了 S3):
事件对象中的对象密钥结构:
your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
let key_parts = event.Records[0].s3.object.key.split('/');
let event_type = key_parts[0];
let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];
let hour = key_parts[4];
if (hour.indexOf('0') == 0) {
hour = parseInt(hour, 10) + '';
let parts1 = key_parts[5].split('-');
let minute = parts1[7];
if (minute.indexOf('0') == 0) {
minute = parseInt(minute, 10) + '';
复制代码
现在,我们可以将文件重新分布到我们需要的两个目标——一个用于分钟处理任务,另一个用于小时聚合:
copyObjectToHourlyFolder(event, date, hour, minute)
.then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))
.then(addPartitionToSpectrum.bind(null, event, date, hour, minute))
.then(deleteOldMinuteObjects.bind(null, event))
.then(deleteStreamObject.bind(null, event))
.then(result => {
callback(null, { message: 'done' });
.catch(err => {
console.error(err);
callback(null, { message: err });
复制代码
Kinesis Firehose 将数据存储在临时文件夹中。我们将对象复制到保存最后一分钟处理数据的另一个文件夹。该文件夹连接到一个小的 Redshift Spectrum 表,数据在该表中进行处理,不需要扫描更大的数据集。我们还将数据复制到保存一整个小时数据的文件夹中,稍后再进行聚合并转换为 Parquet 格式。
ALTER TABLE
spectrum.events
ADD 分区
(date='2017-08-01', hour=0)
LOCATION 's3://nuviad-temp/events/2017-08-01/0/';
复制代码
处理过数据并将其添加到表中之后,我们从 Kinesis Firehose 临时存储和分钟存储文件夹中删除处理后的数据。
使用 AWS Glue 和 Amazon EMR 将 CSV 迁移到 Parquet
我们发现 CSV 数据至 Parquet 转换的小时作业的最简单运行方法是使用 Lambda 和 AWS Glue(感谢强大的 AWS 大数据团队在这方面的帮助)。
创建 AWS Glue 作业
此简单 AWS Glue 脚本执行以下操作:
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])
#day_partition_key = "partition_0"
#hour_partition_key = "partition_1"
#day_partition_value = "2017-08-01"
#hour_partition_value = "0"
day_partition_key = args['day_partition_key']
hour_partition_key = args['hour_partition_key']
day_partition_value = args['day_partition_value']
hour_partition_value = args['hour_partition_value']
print("Running for " + day_partition_value + "/" + hour_partition_value)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)
df.registerTempTable("data")
df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from>
df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)
client = boto3.client('athena', region_name='us-east-1')
response = client.start_query_execution(
QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ')location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
QueryExecutionContext={
'Database': 'spectrumdb'
ResultConfiguration={
'OutputLocation': 's3://nuviad-temp/convertresults'
response = client.start_query_execution(
QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
QueryExecutionContext={
'Database': 'spectrumdb'
ResultConfiguration={
'OutputLocation': 's3://nuviad-temp/convertresults'
job.commit()
复制代码
注:由于 Redshift Spectrum 和 Athena 都使用 AWS Glue 数据目录,我们可以使用 Athena 客户端将分区添加到表中。
下面是关于浮点类型、小数类型和双精度型的一些单词。经证明,使用小数类型比我们预期的更具有挑战性,因为 Redshift Spectrum 和 Spark 似乎以不同的方式使用它们。当我们在 Redshift Spectrum 和 Spark 中使用小数类型时,我们不断的收到错误。
我们必须对几个浮点格式进行试验,直到我们发现唯一有效的组合是在 Spark 代码中将列定义为双精度型在 Spectrum 中将其定义为浮点类型。这是账单在 Spectrum 中被定义为浮点类型在 Spark 代码中被定义为双精度型的原因。
创建一个 Lambda 函数以触发转换
接下来,我们创建了一个简单的 Lambda 函数,以使用简单的 Python 代码每小时触发 AWS Glue 脚本:
import boto3
import json
from datetime import datetime, timedelta
client = boto3.client('glue')
def lambda_handler(event, context):
last_hour_date_time = datetime.now() - timedelta(hours = 1)
day_partition_value = last_hour_date_time.strftime("%Y-%m-%d")
hour_partition_value = last_hour_date_time.strftime("%-H")
response = client.start_job_run(
JobName='convertEventsParquetHourly',
Arguments={
'--day_partition_key': 'date',
'--hour_partition_key': 'hour',
'--day_partition_value': day_partition_value,
'--hour_partition_value': hour_partition_value
复制代码
使用 Amazon CloudWatch Events,我们可以按小时触发此函数。此函数将触发名为 ‘convertEventsParquetHourly’ 的 AWS Glue 作业,并运行前一小时的作业,从而将要处理的作业名称和分区值传递到 AWS Glue 中。
Redshift Spectrum 和 Node.js
我们的开发堆栈基于 Node.js,非常适合需要处理大量交易的高速轻服务器。然而,Node.js 环境有一些限制,要求我们创建变通方法并使用其他工具完成此过程。
Node.js 和 Parquet
由于 Node.js 缺乏 Parquet 模块,需要我们执行 AWS Glue/Amazon EMR 过程以将数据从 CSV 有效迁移到 Parquet。我们宁愿直接保存到 Parquet,但我们找不到有效的方法来执行此操作。
一个有趣的项目是通过称为 node-parquet 的 Marc Vertes 开发Parquet NPM。它没有进入生产状态,但我们认为很值得跟进此生产包的进展。
时间戳数据类型
根据 Parquet 文档,时间戳数据以 64 位整数存储在 Parquet 中。然而,JavaScript 不支持 64 位整数,因为本机号码类型为 64 位双精度型,仅提供 53 位的整数范围。
因此,无法使用 Node.js 将时间戳正确存储在 Parquet 中。解决方法是将时间戳存储为字符串,并将类型转换为查询中的时间戳。使用这种方法,我们没有发现任何性能下降。
经验教训
您可以从我们的试错经验中获益。
经验 1:数据验证很关键
如前所述,分区中的一个条目损坏可能会导致对该分区运行的查询失败,尤其是在使用 Parquet 时,这比简单的 CSV 文件更难编辑。确保您验证数据后再使用 Redshift Spectrum 扫描数据。
经验 2:有效构造数据结构和对数据分区
使用 Redshift Spectrum(或者在此情况下使用 Athena)的一项最大益处是,您不需要一直保持节点的正常运行。您只需为您执行的查询及每个查询所扫描的数据付费。
以正确的格式存储数据
随时使用 Parquet。Parquet 的益处非常多。性能更快、扫描的数据更少且柱状格式更高效。然而,它不支持 Kinesis Firehose 的开箱即用,因此,您需要执行自己的 ETL。AWS Glue 是一个非常好的选项。
为频繁的任务创建较小的表
当我们开始使用 Redshift Spectrum 时,我们发现 Amazon Redshift 的成本每天上涨数百美元。然后,我们意识到,我们没必要每分钟都扫描一整天的数据。利用在同一个 S3 存储桶或文件夹上定义多个表的能力,并为频繁查询创建临时表和较小的表。
经验 3:将 Athena 和 Redshift Spectrum 结合,以获得最大性能
迁移到 Redshift Spectrum 还能让我们利用 Athena,因为它们都使用 AWS Glue 数据目录。在利用 Amazon Redshift 高级查询引擎使用 Redshift Spectrum 进行复杂查询时,使用 Athena 运行快速而简单的查询。
运行复杂的查询时,Redshift Spectrum 更为突出。它可以将很多计算密集型任务(如谓词过滤和聚合)下推到 Redshift Spectrum 层,从而减少查询使用的集群处理能力。
经验 4:在分区类对您的 Parquet 数据进行分类
通过使用 sortWithinPartitions(sort_field) 在分区内对数据进行分类,我们实现了又一次性能改进。例如:
df.repartition(1).sortWithinPartitions("campaign_id")…
复制代码
小结
我们非常高兴在过去的三年内将 Amazon Redshift 用作我们的核心数据仓库。但随着我们的客户群和数据量大幅增长,我们对 Amazon Redshift 进行了扩展,从而利用 Redshift Spectrum 的可扩展性、性能和成本。
Redshift Spectrum 让我们可以清楚的扩展到几乎无限的存储、扩展计算,并且可以为我们的用户提供超快的结果。利用 Redshift Spectrum,我们可以将数据以我们想要的成本存储在我们希望的位置,并且可提供数据供用户在需要时进行分析,且数据性能符合用户预期。
原文链接: