跳转至

Vertica 集成 Apache Hudi

关于本文档

本文档探索了使用外部表集成 Vertica 和 Apache Hudi 的方案。在此探索中,我们使用基于 Spark 的 Apache Hudi 将数据写入 S3,并通过 Vertica 外部表在不同时间线访问这些数据。

Apache Hudi 概述

Apache Hudi 是一个变更数据捕获 (CDC) 工具,用于在不同时间线记录表中的事务。Hudi 代表 Hadoop Upserts Deletes Incrementals,是一个开源框架。Hudi 提供 ACID 事务、可扩展的元数据处理,并统一了流式和批式数据处理。

以下流程图展示了该过程的高级视图。数据通过安装在 Apache Spark 上的 Hudi 处理到 S3,S3 中的数据变更通过 Vertica 外部表读取。

Flow2

前提条件

  • Apache Spark 环境:我们测试使用了 4 节点集群(1 个 Master 和 3 个 Worker)。请按照设置 Apache Spark 多节点集群的说明安装多节点 Spark 环境。启动 Spark 多节点集群。
  • Vertica 分析数据库:我们测试使用了 Vertica Enterprise 11.0.0。
  • AWS S3 或兼容 S3 的对象存储:我们测试使用了 MinIO 作为 S3 存储桶。
  • 以下 jar 文件(必需):可以将 jar 放在 Spark 机器上的任何所需位置。我们将这些 jar 文件放在 /opt/spark/jars 目录下。
  • Hadoop:hadoop-aws-2.7.3.jar
  • AWS:aws-java-sdk-1.7.4.jar
  • 在 Vertica 数据库中运行以下命令,设置访问 S3 存储桶的参数:
    SELECT SET_CONFIG_PARAMETER('AWSAuth', 'accesskey:secretkey');
    SELECT SET_CONFIG_PARAMETER('AWSRegion','us-east-1');
    SELECT SET_CONFIG_PARAMETER('AWSEndpoint', '<S3_IP>:9000');
    SELECT SET_CONFIG_PARAMETER('AWSEnableHttps','0');
    

    注意:您的 endpoint 值可能因您为 S3 存储桶位置选择的 S3 对象存储而异。

Vertica 与 Apache Hudi 集成

要集成 Vertica 与 Apache Hudi,首先需要将 Apache Spark 与 Apache Hudi 集成,配置 jar 文件和访问 AWS S3 的连接。其次,将 Vertica 连接到 Apache Hudi。然后,在 S3 存储桶上执行 Insert、Append、Update 等操作。

按照以下各节的步骤将数据写入 Vertica:

  1. 在 Apache Spark 上配置 Apache Hudi 和 AWS S3
  2. 配置 Vertica 和 Apache Hudi 集成

在 Apache Spark 上配置 Apache Hudi 和 AWS S3

在 Apache Spark 机器上运行以下命令。这将下载 Apache Hudi 包,配置 jar 文件和 AWS S3(即 MinIO):

/opt/spark/bin/spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1

导入 Hudi 所需的读、写和其他包:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

配置 MinIO 的访问密钥、密钥、Endpoint 以及其他 S3A 算法和路径样式:

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "*****")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "*****")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://XXXX.9000")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
sc.hadoopConfiguration.set("fs.s3a.signing-algorithm","S3SignerType")

创建变量存储表名和 MinIO 的 S3 路径:

val tableName = "Trips"
val basePath = "s3a://apachehudi/vertica/"

使用 Scala 在 Apache Spark 中准备示例数据:

val df = Seq(
("aaa","r1","d1",10,"US","20211001"),
("bbb","r2","d2",20,"Europe","20211002"),
("ccc","r3","d3",30,"India","20211003"),
("ddd","r4","d4",40,"Europe","20211004"),
("eee","r5","d5",50,"India","20211005"),
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

将数据写入 AWS S3 并验证:

df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

运行以下 Scala 命令验证数据是否正确从 S3 存储桶读取:

spark.read.format("hudi").load(basePath).createOrReplaceTempView("dta")
spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare,ts, partitionpath from dta order by uuid").show()

截图

配置 Vertica 和 Apache Hudi 集成

在 Vertica 中创建一个外部表,包含来自 S3 上 Hudi 表的数据。我们创建了 "Trips" 表:

CREATE EXTERNAL TABLE Trips
(
_hoodie_commit_time TimestampTz,
uuid varchar,
rider varchar,
driver varchar,
fare int,
ts varchar,
partitionpath varchar
)
AS COPY FROM
's3a://apachehudi/parquet/vertica/*/*.parquet' PARQUET;

运行以下命令验证外部表是否可被读取:

截图

如何让 Vertica 看到变更的数据

以下各节包含我们执行的一些操作示例,以在 Vertica 中查看变更的数据。

追加数据

在此示例中,我们在 Apache Spark 中使用 Scala 运行以下命令,追加了一些数据:

val df2 = Seq(
("fff","r6","d6",50,"India","20211005")
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

运行以下命令将此数据追加到 S3 上的 Hudi 表:

df2.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

更新数据

在此示例中,我们更新了 Hudi 表的一条记录。要更新记录,首先需要将数据导入 Spark 并更新数据:

val df3 = Seq(
("aaa","r1","d1",100,"US","20211001"),
("eee","r5","d5",500,"India","20211001")
).toDF("uuid", "rider", "driver","fare","partitionpath","ts")

运行以下命令将更新数据写入 S3 上的 HUDI 表:

df3.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

以下是 Spark SQL 的输出:

截图

以下是 Vertica 的输出:

截图

创建和查看数据的历史快照

执行以下 Spark 命令,指向特定的时间戳:

val dd = spark.read
.format("hudi")
.option("as.of.instant", "20211007092600")
.load(basePath)

使用以下命令将数据写入 S3 上的 parquet:

dd.write.parquet("s3a://apachehudi/parquet/p2")

在此示例中,我们读取的是截至 20211007092600 的 Hudi 表快照。

dd.show

截图

从 Vertica 执行命令,在 parquet 文件上创建外部表。

更多信息

关于... 参见...
Apache Hudi 网站 http://hudi.apache.org/
Apache Hudi 文档 http://hudi.apache.org/docs/quick-start-guide
Vertica Community Edition https://www.vertica.com/register/
Vertica User Community https://www.vertica.com/big-data-analytics-community-content/
Vertica 文档 http://www.vertica.com/docs/latest/HTML/index.htm

原文来源:https://www.vertica.com/kb/Apache_Hudi_TE/Content/Partner/Apache_Hudi_TE.htm