Vertica 集成 Apache Flink¶
连接指南¶
关于 Vertica 连接指南¶
Vertica 连接指南提供将第三方合作伙伴产品连接到 Vertica 的基本说明。连接指南基于我们针对特定版本的 Vertica 和合作伙伴产品的测试。
Vertica 和 Apache Flink:测试版本¶
本文档基于使用以下版本的测试:
| 软件 | 版本 |
|---|---|
| Apache Flink | 1.13 |
| JDK | 1.8 |
| Scala | 2.11.0 |
| IntelliJ IDEA | IntelliJ Community Edition 2021.1.1 |
| 桌面平台 | Windows Server 2016 |
| Vertica 客户端 | Vertica JDBC Driver 10.1 |
| Vertica 服务器 | Vertica 10.1 |
Apache Flink 概述¶
Apache Flink 是 Apache 软件基金会开发的开源框架和分布式处理引擎。它用于流处理和批处理。该工具使用 Java 和 Scala 开发。您可以使用 Java、Scala 或 Python 在 Apache Flink 中编程。
下载 Vertica 客户端驱动¶
Apache Flink 使用 Vertica JDBC 驱动连接到 Vertica。
- 访问 Vertica Client Drivers 页面。
- 下载适用于您的 Vertica 版本的 JDBC 驱动包。
注意:有关客户端和服务器兼容性的更多信息,请参阅 Vertica 文档中的 Client Driver and Server Version Compatibility。
将 Vertica 连接到 Apache Flink¶
要使用 Apache Flink 从 Vertica 作为源读取数据并写入 Vertica 作为目标,请按以下步骤操作:
1. 创建 Scala 项目并添加依赖¶
- 打开 IntelliJ IDEA,创建一个新的 Scala 项目,构建类型选择 sbt。
- 创建项目后,在项目的
build.sbt文件中添加以下 SBT 依赖。等待依赖加载到您的环境中。
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.13.0"
libraryDependencies += "org.apache.flink" % "flink-core" % "1.13.0"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.13.0"
libraryDependencies += "org.apache.flink" %% "flink-jdbc" % "1.10.3" % "compile"
libraryDependencies += "org.apache.flink" %% "flink-table-planner" % "1.13.0" % "compile"
2. 添加 Vertica JDBC 驱动 Jar 文件¶
- 从菜单点击 File,然后点击 Project Structure。
- 点击 Module SDK 下方的加号,然后点击 JARs or Directories。
- 选择从 Vertica 网站下载的 Vertica JDBC 驱动 jar 文件,点击 OK。
- 在 Project Structure 窗口中,点击 OK。
- Vertica JDBC 驱动将显示在左侧窗格的 Project > External Libraries 中。
3. 创建 Scala 类文件¶
默认情况下选中 Class。选择 Object 并输入类名。
4. 导入必要的包¶
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.{RowTypeInfo}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
5. 创建 Vertica 作为源数据库的连接并映射数据类型¶
val dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.vertica.jdbc.Driver")
.setDBUrl("jdbc:vertica://<IP_Address>:5433/<DB Name>?user=<username>&password=<password>")
.setQuery("Query for reading the source table")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish())
6. 创建临时视图并使用视图创建数据集¶
tableEnv.createTemporaryView("<Temp View Name>", dataSource)
val query = tableEnv.sqlQuery("<Query on source view>")
dataSource.print()
val result = tableEnv.toDataSet[Row](query)
7. 创建 Vertica 作为目标数据库的连接并映射数据类型¶
result.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.vertica.jdbc.Driver")
.setDBUrl("jdbc:vertica://<Target_DB_IP>:5433/<DB Name>?user=<username>&password=<password>")
.setQuery("insert into <SchemaName>.<TableName> values (?,?,?)")
.setSqlTypes(Array(java.sql.Types.VARCHAR, java.sql.Types.VARCHAR, java.sql.Types.VARCHAR))
.finish())
注意:传递给此查询的参数应与源表中的列数匹配。
8. 执行程序¶
9. 验证 Vertica 数据库中的数据¶
检查源数据库和目标数据库中的数据是否正确传输。
示例代码¶
以下是使用 Flink 将 Vertica 作为源和目标的完整示例代码:
package org.vertica.flink
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.{RowTypeInfo}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
object FlinkWithVerticaDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
// Create Source connection and map row datatype
val dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.vertica.jdbc.Driver")
.setDBUrl("jdbc:vertica://<IP_Address>:5433/<DB Name>?user=<username>&password=<password>")
.setQuery("Query for reading the source table")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish())
tableEnv.createTemporaryView("<Temp View Name>", dataSource)
val query = tableEnv.sqlQuery("<Query on source view>")
dataSource.print()
val result = tableEnv.toDataSet[Row](query)
result.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.vertica.jdbc.Driver")
.setDBUrl("jdbc:vertica://<Target_DB_IP>:5433/<DB Name>?user=<username>&password=<password>")
.setQuery("insert into <SchemaName>.<TableName> values (?,?,?)")
.setSqlTypes(Array(java.sql.Types.VARCHAR, java.sql.Types.VARCHAR, java.sql.Types.VARCHAR))
.finish())
env.execute("flink-test")
}
}
已知限制¶
读取模式¶
- 对于 TIME、TIMETZ 和 TIMESTAMPTZ 数据类型,毫秒被截断。
- 对于 BINARY、VARBINARY 和 LONG VARBINARY 数据类型,预览数据时值显示不正确。
写入模式¶
- INTERVAL 和 UUID 数据类型不受支持。要使用这些数据类型,必须将它们转换为 VARCHAR。
- 对于 TIME 数据类型,毫秒四舍五入为 3 位。
- 对于 TIMETZ 数据类型,不支持 Null 和毫秒。
- 对于 TIMESTAMPTZ,不支持毫秒。
更多信息¶
原文来源:https://www.vertica.com/kb/Apache_Flink_CG/Content/Partner/Apache_Flink_CG.htm