使用 插入 更新 EMR Apache Amazon 和 删除 上的数据 Hudi S3 (更新用数据或wⅰf怎么选)

使用 插入 更新 EMR Apache Amazon 和 删除 上的数据 Hudi S3 (更新用数据或wⅰf怎么选)

将数据存储在中会在扩展、可靠性和成本效益方面提供很多优势。 除此之外,您可以使用Apache Spark、和等开源工具来利用处理和分析您的数据。 尽管这些工具很强大,但若要处理需要您进行增量数据处理和记录级插入、更新和删除的使用案例,仍然具有挑战性。

我们与客户交谈发现,有些使用案例需要处理对个别记录的增量更改,例如:

从今天开始,EMR 版本 5.28.0 包含Apache Hudi(孵化),因此,您不再需要构建自定义解决方案来执行记录级插入、更新和删除操作。Hudi 开发于 2016 年开始于 Uber,用于解决提取和管道间的效率低下。 近几个月来,EMR 团队与 Apache Hudi 社区密切合作,贡献了很多修补程序,包括将 Hudi 更新为 Spark 2.4.4 ()、支持 Spark Avro ()、增加对 AWS Glue target="_blank">HUDI-306) 的支持以及多个漏洞修复。

使用 Hudi,您可以在 S3 上执行记录级插入、更新和删除,以便能够符合数据隐私法、使用实时流和变更数据捕获、恢复延迟到达的数据及以开放的、与供应商无关的格式追踪历史记录和回滚。您将创建数据集和表,Hudi 则管理基础数据格式。 Hudi 使用Apache Parquet和Apache Avro进行数据存储,并且包括与 Spark、Hive 和 Presto 的内置集成,以便您能够使用与当前使用相同的工具来查询 Hudi 数据集,并能近乎实时地访问全新数据。

启动 EMR 集群时,适用于 Hudi 的库和工具会在至少选择了以下组件之一:Hive、Spark 或 Presto 的任何时候自动安装和配置。 您可以使用 Spark 创建新的 Hudi 数据集,并插入、更新和删除数据。每个 Hudi 数据集均在您的集群的配置元存储(包括AWS Glue>

Hudi 支持两种存储类型,这两种类型定义写入、索引和从 S3 中读取数据的方式:

我们来快速概览如何在 EMR 集群中设置和使用 Hudi 数据集。

**将 Apache Hudi 与 Amazon EMR 结合使用

**我开始从EMR 控制台中创建集群。在高级选项中,我选择 EMR 版本 5.28.0(包含 Hudi 的第一个版本)和以下应用程序:Spark、Hive 和。在硬件选项中,我添加了 3 个任务节点,以确保拥有足够的容量来同时运行 Spark 和 Hive。

当集群准备就绪时,我使用我在安全选项中选择的密钥对来 SSH 到主节点并访问 Spark Shell。我使用下面的命令来启动 Spark Shell 以将其用于 Hudi:

$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"--conf "spark.sql.hive.convertMetastoreParquet=false"--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
复制代码

在这里,我使用下面的代码使用“写入时复制”存储类型将一些示例日志导入 Hudi 数据集中:

import org.apache.spark.sql.SaveModeimport org.apache.spark.sql.functions._import org.apache.hudi.DataSourceWriteOptionsimport org.apache.hudi.config.HoodieWriteConfigimport org.apache.hudi.hive.MultiPartKeysValueExtractor//将各种输入值设置为变量val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"val hudiTableName = "elb_logs_hudi_cow"val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName//设置我们的 Hudi 数据源选项val hudiOptions = Map[String,String](DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip",DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb",HoodieWriteConfig.TABLE_NAME -> hudiTableName,DataSourceWriteOptions.OPERATION_OPT_KEY ->DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp",DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb",DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->classOf[MultiPartKeysValueExtractor].getName)//从 S3 中读取数据并使用分区和记录密钥创建>val inputDF = spark.read.format("parquet").load(inputDataPath)//将数据写入 Hudi 数据集中inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
复制代码

在 Spark Shell 中,我现在可以对 Hudi 数据集中的记录进行计数:

scala> inputDF2.count()

res1: Long = 10491958

在选项中,我使用了与为集群配置的 Hive 元存储的集成,以便在 默认 数据库中创建表。用此方式,我可以使用 Hive 查询 Hudi 数据集中的数据:

hive> use default;

hive> select count(*) from elb_logs_hudi_cow;

现在,我可以更新或删除数据集中的单个记录。在 Spark Shell 中,我准备了一些变量来查找我想要更新的记录,并准备了一个 SQL 语句来选择我想要更改的列值:

val requestIpToUpdate = "243.80.62.181"val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"
复制代码

我执行 SQL 语句来查看列的当前值:

scala> spark.sql(sqlStatement).show()

| elb_name|

|elb_demo_003|

然后,我选择并更新记录:

//使用单个记录创建>val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate).withColumn("elb_name", lit("elb_demo_001"))
复制代码

现在,我使用与我用于创建数据集的语法相似的语法来更新 Hudi 数据集。但此时,我编写的>

//将>updateDF.write.format("org.apache.hudi").options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
复制代码

在 Spark Shell 中,我检查了更新的结果:

scala> spark.sql(sqlStatement).show()

| elb_name|

|elb_demo_001|

现在,我想要删除相同的记录。要删除记录,我将 EmptyHoodieRecordPayload 有效负载传递到写入选项中:

//使用 EmptyHoodieRecordPayload 编写>updateDF.write.format("org.apache.hudi").options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,"org.apache.hudi.EmptyHoodieRecordPayload").mode(SaveMode.Append).save(hudiTablePath)
复制代码

在 Spark Shell 中,我发现记录不再可用:

scala> spark.sql(sqlStatement).show()

|elb_name|

Hudi 如何管理所有这些更新和删除操作? 我们来使用 Hudi 命令行界面 (CLI) 连接到数据集,然后发现这些更改被解读为提交内容:

此数据集为“写入时复制”数据集,这意味着,每当对记录进行更新时,包含该记录的文件将被重新编写,以包含更新的值。您可以看到每个提交编写了多少记录。表格的底行描述了数据集的初始创建,其上面是单个记录的更新,顶行是单个记录的删除。

使用 Hudi,您可以回滚到每个提交。例如,我可以使用以下命令回滚删除操作:

hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031

在 Spark Shell 中,记录现在已返回原来的位置,刚好在更新之后:

scala> spark.sql(sqlStatement).show()

| elb_name|

|elb_demo_001|

“写入时复制”为默认的存储类型。我可以重复上述步骤,通过将“读取时合并”数据集类型添加到我们的 hudiOptions 中来创建和更新该数据集类型:

DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"

如果您更新“读取时合并”数据集并使用 Hudi CLI 查看提交内容,您可以看到“读取时合并”与“写入时复制”相比有何不同。使用“读取时合并”,您只会写入更新的行,而不是像“写入时复制”一样写入整个文件。这就是“读取时合并”有助于需要更多写入或更新/删除密集型工作负载、具有较少读取次数的使用案例的原因。Delta 提交内容以 Avro 记录(基于行的存储)形式写入磁盘中,压缩的数据以 Parquet 文件(分列存储)的形式写入。为了避免创建太多 delta 文件,Hudi 将自动压缩您的数据集,以使您的读取尽可能具有高性能。

创建“读取时合并”数据集时,会创建两个 Hive 表:

查询时,第一个表会返回已压缩的数据,并且将不会显示最新的 delta 提交内容。使用此表会提供最佳性能,但会忽略最新的数据。查询实时表会将压缩的数据与读取时的 delta 提交内容合并,因此,此数据集被称为“读取时合并”。此操作将产生可用的最新数据,但会产生性能开销,且不会像查询压缩数据一样具有高性能。用此方式,数据工程师和分析师可以在性能与数据新鲜度之间灵活地进行选择。

**现已推出

**此新功能现已在具有 EMR 5.28.0 的所有区域推出。 将 Hudi 与 EMR 结合使用不会产生额外费用。您可以在 EMR 文档中了解有关 Hudi 的更多信息。这款新工具可以简化您处理、更新和删除 S3 中的数据的方式。请让我知道您打算将它用于哪些使用案例!

原文链接:

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。