Amazon Athena是一种交互式查询服务,可使用标准 SQL 轻松分析存储在 Amazon S3 中的数据。Athena 是一种无服务器服务,因此您无需管理任何基础设施,而且只需为所运行的查询付费。Athena 简单易用。只需指向 Amazon S3 中的数据,定义 schema,然后即可开始使用标准 SQL 语法执行查询。
在本博文中,我们将回顾可以提高查询性能的十大技巧。我们将着重关注对存储在 Amazon S3 上的数据的查询优化。Amazon Athena 使用 Presto 运行 SQL 查询,因此,如果您在 Amazon EMR 上运行 Presto,那么某些建议也行之有效。
这篇文章假定您了解不同的文件格式,例如 Parquet、ORC、Text 文件、Avro、CSV、TSV 和 JSON。
最佳实践:存储
本节讨论如何构造数据,以便您可以最大程度地利用 Athena。如果您的数据存储在 Amazon S3 上,那么同样的这些最佳实践还可以应用于 Amazon EMR 数据处理程序,例如 Spark、Presto 和 Hive。
1.对数据进行分区
分区会将表分为多个部分,并根据日期、国家/地区、区域等列值将相关数据存放在一起。分区充当虚拟列。您可以在创建表时定义分区,它们有助于减少每次查询扫描的数据量,从而提高性能。您可以通过指定基于分区的筛选条件来限制查询所扫描的数据量。有关更多详细信息,请参见数据分区。
Athena 支持 Hive 分区,该分区遵循以下命名约定之一:
a) 分区列名称,后跟等号(“=”),然后再接值。
s3://yourBucket/pathToTable/<PARTITION_COLUMN_NAME>=<VALUE>/<PARTITION_COLUMN_NAME>=<VALUE>/
如果以这种格式对数据集进行分区,则可以运行 MSCK REPAIR table 命令将分区自动添加到表中。
b) 如果数据的“路径”不遵循上述格式,则可以使用ALTER TABLE ADD PARTITION命令为每个分区手动添加分区。例如
s3://yourBucket/pathToTable/YYYY/MM/DD/
Alter Tableadd Partition (PARTITION_COLUMN_NAME = , PARTITION_COLUMN2_NAME = ) LOCATION ‘s3://yourBucket/pathToTable/YYYY/MM/DD/’;
注意:通过上述方法,您可以将任何位置与要用于引用它们的值对应起来。
以下示例展示了在 S3 存储桶中存储的航班表的“year”(年份)列上对数据进行分区。
$ aws s3 ls s3://athena-examples/flight/parquet/
PRE year=1987/
PRE year=1988/
PRE year=1989/
PRE year=1990/
PRE year=1991/
PRE year=1992/
PRE year=1993/
复制代码
您可以在“WHERE”子句中使用列来限制查询中扫描的分区。
SELECT dest, origin FROM flights WHERE year = 1991
复制代码
您还可以将多个列用作分区键。您可以扫描数据以获取特定值等等。
s3://athena-examples/flight/parquet/year=1991/month=1/day=1/
s3://athena-examples/flight/parquet/year=1991/month=1/day=2/
在确定要用于分区的列时,请考虑以下事项:
例如:
下表比较了分区的表和未分区的表之间的查询运行时间。两个表均包含 74GB 的未压缩数据,采用文本格式存储。分区的表按 l_shipdate 列进行分区,总共有 2526 个分区。
col 1|col 2|col 3|col 4|col 5|col 6
复制代码
查询 | 未分区的表 | 成本 | 分区的表 | 成本 | 节省
| 运行时间 | 扫描的数据 || 运行时间 | 扫描的数据 ||
SELECT count(*) FROM lineitem WHERE l_shipdate = ‘1996-09-01’|9.71 秒|74.1 GB| 0.36 USD |2.16 秒| 29.06 MB| 0.0001 USD | 成本低 99%
速度快 77%
SELECT count(*) FROM lineitem WHERE l_shipdate >= ‘1996-09-01’ AND l_shipdate < ‘1996-10-01’|10.41 秒|74.1 GB| 0.36 USD |2.73 秒| 871.39 MB | 0.004 USD| 成本低 98%
速度快 73%
但是,如下面的运行时间所示,分区也有相应的代价。切勿对数据进行过度分区。
col 1|col 2|col 3|col 4|col 5|col 6
复制代码
查询 | 未分区的表 | 成本 | 分区的表 | 成本 | 节省
| 运行时间 | 扫描的数据 || 运行时间 | 扫描的数据 ||
SELECT count(*) FROM lineitem;|8.4 秒|74.1 GB| 0.36 USD | 10.65 秒|74.1 GB| 0.36 USD | 速度慢 27%
2. 分桶
数据分区的另一种方法是用_存储桶_划分单个分区内的数据。使用存储桶做数据划分时,您可以指定一个或多个列,这些列包含要分组的行,然后将这些行存放到多个存储桶中。这样,您可以在指定纳入存储桶的列值时,仅查询需要读取的存储桶,这可以大大减少要读取的数据行数。
选择要用于分桶的列时,建议您选择基数高的列(也就是说,其中具有大量唯一值),而且应该选择通常用于在查询期间筛选所读取的数据的列。用于存储桶数据划分的一个理想的列示例就是主键,例如系统的用户 ID。
在 Athena 中,您可以通过指定 CLUSTERED BY () INTOBUCKETS,在 Create Table 语句中指定要用存储桶划分数据的列。存储桶的数量应保证所得到的文件均为最佳大小。有关更多详细信息,请参见优化文件大小部分。
要在 Athena 中使用分桶表,您必须用 Apache Hive 创建数据文件,因为 Athena 不支持 Apache Spark 分桶格式。有关如何创建分桶表的信息,请参阅 Apache Hive 文档中的LanguageManual DDL BucketedTables。
另外还要注意,Athena 不支持文件数与存储桶数不匹配的表和分区,例如执行了多个 INSERTS INTO 语句之后的情况。
下表显示了使用 c_custkey 列创建 32 个存储桶的客户表中的区别。客户表的大小为 2.29 GB。
查询 | 非分桶表 | 成本 | 使用 c_custkey 作为聚簇列的分桶表 | 成本 | 节省 |
运行时间 | 扫描的数据 | 运行时间 | 扫描的数据 | ||
SELECT count(*) FROM customer where c_custkey = 12677856; | 1.53 秒 | 0.01145 USD | 1.01 秒 |
速度快 34%
3.压缩和分割文件
只要文件具有最佳大小(请参见下一部分)或文件可分割,压缩数据就可以显著加快查询速度。较小的数据大小可减少从 Amazon S3 到 Athena 的网络流量。
可分割文件允许 Athena 中的执行引擎将一个文件的读取操作分割开来,供多个读取器进行读取,从而提高并行性。如果只有一个不可分割的文件,则只有一个读取器可以读取该文件,而所有其他读取器都会处于空闲状态。并非所有压缩算法都是可分割的。下表列出了常见压缩格式及其属性。
col 1|col 2|col 3|col 4
复制代码
算法 | 是否可分割? | 压缩率 | 压缩 + 解压缩速度
Gzip (DEFLATE) |否|高|中
bzip2|是|非常高|慢
LZO|否|低|快
Snappy|否|低|非常快
通常,算法的压缩率越高,压缩和解压缩数据所需的 CPU 资源就越多。
对于 Athena,我们建议使用 Apache Parquet 或 Apache ORC,它们在默认情况下会压缩数据,并且支持分割。如果无法选择,请尝试使用 BZip2 或 Gzip,并且使用最佳文件大小。
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## Read TABLE_NAME from DB_NAME out of the AWS Glue>dataset = glueContext.create_dynamic_frame.from_catalog(database = DB_NAME, table_name = TABLE_NAME, transformation_ctx = "dataset")
## Write>outputdf = glueContext.write_dynamic_frame.from_options( \
frame =>connection_type = "s3",
connection_options = {"path":"s3://bucket/prefix/"},
format = "json",
compression = "gzip",
transformation_ctx = "outputdf")
job.commit()
复制代码
.优化文件大小
在可以并行读取数据并可以顺序读取数据块时,查询运行效率会更高。无论文件有多大,确保您的文件格式可拆分都有助于并行处理。
但是,如果文件太小(通常小于 128 MB),执行引擎可能要耗费额外的时间来打开 Amazon S3 文件、列出目录、获取对象元数据、设置数据传输、读取文件头、读取压缩字典等等。另一方面,如果您的文件不可分割并且文件太大,则查询处理将一直等到单个读取器完成读取整个文件为止。这会造成并行性降低。
解决小文件问题的一种方法是在 Amazon EMR 上使用实用程序。您可以使用它将较小的文件组合为较大的对象。您还可以使用 S3DistCP,以优化的方式将大量数据从 HDFS 移至 Amazon S3、从 Amazon S3 移至 Amazon S3、从 Amazon S3 移至 HDFS。
采用较大文件的部分优势:
示例
下表比较了两个表之间的查询运行时间,其中一个表由一个大文件支持,而另一个表则由 5,000 个小文件支持。两个表都包含 7 GB 的数据,以文本格式存储。
**查询**| **文件数量** | **运行时间**
复制代码
SELECT count() FROM lineitem|1 个文件|2.31 秒
加速效果|| 速度快 72%
您还可以使用 AWS Glue 拆分数据,如以下示例脚本所示。
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## Read TABLE_NAME from DB_NAME out of the AWS Glue>dataset = glueContext.create_dynamic_frame.from_catalog(database = DB_NAME, table_name = TABLE_NAME, transformation_ctx = "dataset")
## Write>outputdf = glueContext.write_dynamic_frame.from_options( \
frame =>connection_type = "s3",
connection_options = {"path":"s3://bucket/prefix/", compression = "gzip"},
format = "json",
transformation_ctx = "outputdf")
job.commit()
复制代码
5.优化列式数据存储的生成
Apache Parquet和是热门的列式数据存储。它们通过采用列压缩、不同的编码、基于数据类型的压缩和谓词下推来提供高效数据存储的功能。它们也具有可分割的特点。通常,更高的压缩率或跳过数据块就意味着,从 Amazon S3 读取的字节数更少,这能提高查询性能。
可以调整的一个参数是_块大小_或_条带大小_。Parquet 中的块大小或 ORC 中的条带大小表示:可装入一个块的最大行数(按字节大小衡量)。块/条带越大,每个块中可以存储的行就越多。默认情况下,Parquet 块大小为 128 MB,ORC 条带大小为 64 MB。如果您的表具有许多列,则建议使用更大的块大小,以确保每个列块的大小都能保证有效的顺序 I/O。
可以调整的另一个参数是数据块上采用的压缩算法。Parquet 默认采用 Snappy,但它还支持不使用压缩,或者使用 GZIP 和基于 LZO 的压缩。ORC 默认采用 ZLIB,但它还支持不使用压缩,或者使用 Snappy 进行压缩。如果您有 10 GB 以上的数据,我们建议首先使用默认的压缩算法,并测试其他压缩算法的效果。
Parquet 和 ORC 文件格式都支持_谓词下推_(也称为_谓词筛选_)。Parquet 和 ORC 都有代表列值的数据块。每个块均包含该块的统计信息,例如最大值/最小值。执行查询时,这些统计信息确定应读取还是跳过该块。
优化要跳过的块数的一种方法是,在写入 ORC 或 Parquet 文件之前,对常用的筛选的列进行标识和排序。这样可以保证该块内最小值与最大值之间的差距在每个块内尽可能小。这就提供了更好的数据修剪机会。
您可以使用 Amazon EMR 上的 Spark 或 Hive 将现有数据转换为 Parquet 或 ORC。有关更多信息,请参阅博文Analyzing>
**查询优化
Athena 在后台使用 Presto。了解 Presto 的工作原理有助于深入了解如何在运行查询时优化查询。
本节详细介绍以下最佳实践:
温馨提示:请注意仅包含您需要的列。
6.优化 ORDER BY
ORDER BY 子句按排序顺序返回查询结果。要进行排序,Presto 必须将所有数据行发送给单个工作进程,然后对其进行排序。这可能会导致 Presto 出现内存压力,从而造成查询需要花费很长时间才能执行完成。更糟糕的是,查询可能会失败。
如果您使用 ORDER BY 子句查看前 个值或后 N 个值,请使用 LIMIT 子句,通过限制排序使用多个单独的工作进程来显著降低排序成本,而不是在一个工作进程内完成排序。
例如:
数据集:7.25 GB 表,未压缩,文本格式,约 6000 万行
**查询**| **运行时间**
复制代码
SELECT * FROM lineitem ORDER BY l_shipdate|528 秒
SELECT * FROM lineitem ORDER BY l_shipdate LIMIT 10000| 11.15 秒
加速| 速度快 98%
7.优化联接
联接两个表时,请在联接的左侧指定较大的表,在联接的右侧指定较小的表。Presto 将右侧的表分配给工作进程节点,然后流式传输左侧的表以进行联接。如果右侧的表较小,则占用的内存量较少,并且查询运行速度更快。
例如:
数据集:总计 74 GB 数据,未压缩,文本格式,约 6.02 亿行
**查询**| **运行时间**
复制代码
SELECT count() FROM part, lineitem WHERE lineitem.l_partkey = part.p_partkey| 10.71 秒
节约/加速| 大约加速 53%
该规则的例外是将多个表联接在一起时,可能会发生交叉连接。Presto 不支持联接重新排序,因此会从左到右执行联接。因此,您应从最大到最小指定表,同时确保未同时指定两个表,否则就会导致交叉联接。
例如 :
数据集:总计 9.1 GB,未压缩,文本格式,总共约 7600 万行
复制代码查询 | 运行时间
SELECT count() FROM lineitem, orders, customer WHERE lineitem.l_orderkey = orders.o_orderkey AND customer.c_custkey = orders.o_custkey|3.71 秒
8.优化 GROUP BY
GROUP BY 运算符将基于 GROUP BY 列的行分配给工作进程节点,这些工作进程节点将 GROUP BY 值保存在内存中。提取行时,系统将在内存中查找 GROUP BY 列,并比较这些值。如果 GROUP BY 列匹配,则这些值会聚合在一起。
在查询中使用 GROUP BY 时,请按基数从高到低的顺序排列(即,唯一值数量最多,平均分布)。
SELECT state, gender, count(*)
FROM census
GROUP BY state, gender;
复制代码
另一种优化方法是在 GROUP BY 子句中尽可能使用数字而不是字符串。与字符串相比,数字需要的内存量更少,而且比较速度也更快。这些数字表示分组的列名在 SELECT 语句中的位置。例如:
SELECT state, gender, count(*)
FROM census
GROUP BY 1, 2;
复制代码
另一种优化方法是限制 SELECT 语句中的列数,以减少所需的内存量,这样可以将行保存在内存中,并为 GROUP BY 子句进行聚合。
9.优化 LIKE 运算符
当您在字符串列上筛选多个值时,通常最好使用正则表达式,而不是多次使用 LIKE 子句。在比较大量值时,这种做法特别有用。
例如:
数据集:74 GB 表,未压缩,文本格式,约 6 亿行
**查询**| **运行时间**
复制代码
SELECT count() FROM lineitem WHERE regexp_like(l_comment, ‘wake|regular|express|sleep|hello’)| 15.87 秒
加速| 速度快 17%
10.使用近似函数
为了浏览大型数据集,一种常见的案例是使用 COUNT(DISTINCT column) 查找特定列中不同唯一值的计数。一个示例就是查看访问网页的唯一身份用户数。
如果并不需要精确的数字,例如,如果您正在寻找有必要进一步探究的网页,那么可以考虑使用 approx_distinct()。此函数尝试通过计算值的唯一哈希值而非整个字符串来最大程度地减少内存使用量。缺点是这种方法的标准误差为 2.3%。
例如:
数据集:7.25 GB 表,未压缩,文本格式,约 6000 万行
**查询**| **运行时间**
复制代码
SELECT count(distinct l_comment) FROM lineitem;| 13.21 秒
SELECT approx_distinct(l_comment) FROM lineitem;| 10.95 秒
加速| 速度快 17%
有关更多信息,请参阅 Presto 文档中的Aggregate Functions(聚簇函数)。
温馨提示:请注意仅包含您需要的列
运行查询时,将最终的 SELECT 语句范围限制在所需的列之内,而非选择所有列。减少列数可以减少整个查询执行管道中需要处理的数据量。在您查询具有大量基于字符串的列的表以及执行多个联接或聚簇时,这特别有帮助。
例如:
数据集:7.25 GB 表,未压缩,文本格式,约 6000 万行
**查询**| **运行时间**
复制代码
SELECT * FROM lineitem, orders, customer WHERE lineitem.l_orderkey = orders.o_orderkey AND customer.c_custkey = orders.o_custkey;|983 秒
SELECT customer.c_name, lineitem.l_quantity, orders.o_totalprice FROM lineitem, orders, customer WHERE lineitem.l_orderkey = orders.o_orderkey AND customer.c_custkey = orders.o_custkey;|6.78 秒
节约/加速| 快 145 倍
小结
这篇文章介绍了我们使用 Presto 引擎在 Amazon Athena 上优化交互式分析的十大技巧。在 Amazon EMR 上使用 Presto 时,您可以将文中介绍的这些做法付诸使用。
如果您有任何问题或建议,请在下方留言。
作者介绍:
Manjeet Chayel 是 AWS 的一名软件架构师
Mert Hocanin 是 AWS EMR 的一名大数据架构师
原文:Top 10 Performance Tuning Tips for Amazon Athena
原文链接: