跳转至

Vertica 集成 Apache Flink

连接指南

关于 Vertica 连接指南

Vertica 连接指南提供将第三方合作伙伴产品连接到 Vertica 的基本说明。连接指南基于我们针对特定版本的 Vertica 和合作伙伴产品的测试。

本文档基于使用以下版本的测试:

软件 版本
Apache Flink 1.13
JDK 1.8
Scala 2.11.0
IntelliJ IDEA IntelliJ Community Edition 2021.1.1
桌面平台 Windows Server 2016
Vertica 客户端 Vertica JDBC Driver 10.1
Vertica 服务器 Vertica 10.1

Apache Flink 是 Apache 软件基金会开发的开源框架和分布式处理引擎。它用于流处理和批处理。该工具使用 Java 和 Scala 开发。您可以使用 Java、Scala 或 Python 在 Apache Flink 中编程。

下载 Vertica 客户端驱动

Apache Flink 使用 Vertica JDBC 驱动连接到 Vertica。

  1. 访问 Vertica Client Drivers 页面。
  2. 下载适用于您的 Vertica 版本的 JDBC 驱动包。

    注意:有关客户端和服务器兼容性的更多信息,请参阅 Vertica 文档中的 Client Driver and Server Version Compatibility

要使用 Apache Flink 从 Vertica 作为源读取数据并写入 Vertica 作为目标,请按以下步骤操作:

1. 创建 Scala 项目并添加依赖

  1. 打开 IntelliJ IDEA,创建一个新的 Scala 项目,构建类型选择 sbt。
  2. 创建项目后,在项目的 build.sbt 文件中添加以下 SBT 依赖。等待依赖加载到您的环境中。
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.13.0"
libraryDependencies += "org.apache.flink" % "flink-core" % "1.13.0"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.13.0"
libraryDependencies += "org.apache.flink" %% "flink-jdbc" % "1.10.3" % "compile"
libraryDependencies += "org.apache.flink" %% "flink-table-planner" % "1.13.0" % "compile"

2. 添加 Vertica JDBC 驱动 Jar 文件

  1. 从菜单点击 File,然后点击 Project Structure
  2. 点击 Module SDK 下方的加号,然后点击 JARs or Directories
  3. 选择从 Vertica 网站下载的 Vertica JDBC 驱动 jar 文件,点击 OK
  4. 在 Project Structure 窗口中,点击 OK
  5. Vertica JDBC 驱动将显示在左侧窗格的 Project > External Libraries 中。

3. 创建 Scala 类文件

默认情况下选中 Class。选择 Object 并输入类名。

4. 导入必要的包

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.{RowTypeInfo}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._

5. 创建 Vertica 作为源数据库的连接并映射数据类型

val dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
  .setDrivername("com.vertica.jdbc.Driver")
  .setDBUrl("jdbc:vertica://<IP_Address>:5433/<DB Name>?user=<username>&password=<password>")
  .setQuery("Query for reading the source table")
  .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
  .finish())

6. 创建临时视图并使用视图创建数据集

tableEnv.createTemporaryView("<Temp View Name>", dataSource)
val query = tableEnv.sqlQuery("<Query on source view>")
dataSource.print()
val result = tableEnv.toDataSet[Row](query)

7. 创建 Vertica 作为目标数据库的连接并映射数据类型

result.output(JDBCOutputFormat.buildJDBCOutputFormat()
  .setDrivername("com.vertica.jdbc.Driver")
  .setDBUrl("jdbc:vertica://<Target_DB_IP>:5433/<DB Name>?user=<username>&password=<password>")
  .setQuery("insert into <SchemaName>.<TableName> values (?,?,?)")
  .setSqlTypes(Array(java.sql.Types.VARCHAR, java.sql.Types.VARCHAR, java.sql.Types.VARCHAR))
  .finish())

注意:传递给此查询的参数应与源表中的列数匹配。

8. 执行程序

env.execute("flink-test")

9. 验证 Vertica 数据库中的数据

检查源数据库和目标数据库中的数据是否正确传输。

示例代码

以下是使用 Flink 将 Vertica 作为源和目标的完整示例代码:

package org.vertica.flink

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.{RowTypeInfo}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._

object FlinkWithVerticaDemo {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)

    // Create Source connection and map row datatype
    val dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
      .setDrivername("com.vertica.jdbc.Driver")
      .setDBUrl("jdbc:vertica://<IP_Address>:5433/<DB Name>?user=<username>&password=<password>")
      .setQuery("Query for reading the source table")
      .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
      .finish())

    tableEnv.createTemporaryView("<Temp View Name>", dataSource)
    val query = tableEnv.sqlQuery("<Query on source view>")
    dataSource.print()
    val result = tableEnv.toDataSet[Row](query)

    result.output(JDBCOutputFormat.buildJDBCOutputFormat()
      .setDrivername("com.vertica.jdbc.Driver")
      .setDBUrl("jdbc:vertica://<Target_DB_IP>:5433/<DB Name>?user=<username>&password=<password>")
      .setQuery("insert into <SchemaName>.<TableName> values (?,?,?)")
      .setSqlTypes(Array(java.sql.Types.VARCHAR, java.sql.Types.VARCHAR, java.sql.Types.VARCHAR))
      .finish())

    env.execute("flink-test")
  }
}

已知限制

读取模式

  • 对于 TIME、TIMETZ 和 TIMESTAMPTZ 数据类型,毫秒被截断。
  • 对于 BINARY、VARBINARY 和 LONG VARBINARY 数据类型,预览数据时值显示不正确。

写入模式

  • INTERVAL 和 UUID 数据类型不受支持。要使用这些数据类型,必须将它们转换为 VARCHAR。
  • 对于 TIME 数据类型,毫秒四舍五入为 3 位。
  • 对于 TIMETZ 数据类型,不支持 Null 和毫秒。
  • 对于 TIMESTAMPTZ,不支持毫秒。

更多信息


原文来源:https://www.vertica.com/kb/Apache_Flink_CG/Content/Partner/Apache_Flink_CG.htm