跳转至

Vertica 集成 Apache Airflow

关于 Vertica 连接指南

Vertica 连接指南提供将第三方合作伙伴产品连接到 Vertica 的基本说明。连接指南基于我们在特定版本的 Vertica 和合作伙伴产品上的测试。

最新测试版本

本文档基于以下版本的测试:

组件 版本
Apache Airflow 2.2.2
桌面平台 Red Hat Enterprise Linux Server 8.3
Vertica 客户端 Vertica Python Driver 1.0.2
Vertica 服务器 Vertica 11.0.1

我们使用 Vertica Python Driver 测试了 Apache Airflow 与 Vertica 的集成。要使用 Airflow 连接到 Vertica,您需要使用自己的 Python 脚本创建有向无环图(DAG)。我们创建了一个示例 Python 脚本,用于从 Vertica 读取数据并写入 Vertica。该示例脚本在本指南后面提供供参考。测试结果发现,由于 Python 脚本的原因,存在一些数据类型限制,列在本指南末尾。

Apache Airflow 概述

Apache Airflow 是一个开源的工作流引擎和调度系统。它作为有向无环图(DAG)执行您的工作流。Airflow 允许您调度这些 DAG 并设置这些任务的依赖关系。其简单的用户界面使监控和管理任务、检查状态和进度以及排查问题变得容易。

安装 Apache Airflow

安装 Apache Airflow,请按照 此处 的先决条件和说明进行操作。

安装 Vertica 客户端驱动程序

Apache Airflow 使用 Vertica Python 客户端连接到 Vertica。您还需要安装 sqlalchemy 和用于 Vertica 的 Apache Airflow 提供商包。

注意: 确保您拥有最新版本的 Python 和 pip。

将 Vertica 连接到 Apache Airflow

设置 Airflow 后:

  1. 启动数据库:

    airflow db init
    

  2. 在 Airflow 中创建用户:

    airflow users create -r Admin -u admin -e admin@example.com -f admin -l user -p test
    

    参数说明:-r 是角色,-u 是用户名,-e 是邮箱,-f 是名字,-l 是姓氏,-p 是密码。

  3. 启动 Web 服务器(默认端口 8080):

    airflow webserver --port 8080
    

  4. 在浏览器中打开: http://localhost:8080/home

  5. 配置 Vertica 连接:

  6. 选择 Admin,点击 Connections
  7. List Connection 中,选择 vertica_default,点击编辑记录按钮。
  8. 输入连接详情:
    • Conn Id:默认为 vertica_default,可以更改
    • Schema:数据库名称
  9. 点击 Save

注意: 您在此处指定的连接变量将存储在 apache airflow providers vertica 文件中。

使用 Vertica 连接创建有向无环图(DAG)

在此示例中,我们将创建一个 DAG,从 Vertica 读取数据并返回表中的行数。

1、使用命令行界面,在 airflow 文件夹中创建 dags 文件夹。

2、在 dags 文件夹中创建示例 Python 程序。

3、在 DAG 中导入 VerticaHook 库以连接到 Vertica:

from airflow.contrib.hooks.vertica_hook import VerticaHook
4、定义函数时添加以下变量:
variable = VerticaHook('Connection_id').get_cursor()
5、要更新 Apache Airflow 中的 DAG 列表,需要运行调度器。在一个新的命令行中运行:
airflow scheduler
6、我们现在已经创建了以下示例 python 程序,并将其保存在 dag 文件夹中。

截图

7、在 Apache Airflow 中,单击“触发 DAG”按钮并检查状态。

如果状态显示成功,请查看日志文件夹中的日志以获取结果。日志文件夹位于 airflow 文件夹中。

截图

示例 DAG 脚本

from datetime import datetime, timedelta
import os
import logging
from airflow import DAG
from airflow.contrib.hooks.vertica_hook import VerticaHook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.today(),
    'email': ['@airflow'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG("vertica_monitoring_1",
          default_args=default_args,
          schedule_interval=None)

def get_vertica_status(**kwargs):
    cur = VerticaHook('vertica_default').get_cursor()
    sql = "select count(*) from r_test.sample_decimal "
    cur.execute(sql)
    result = cur.fetchall()
    logging.info(result)
    logging.info('above is the result value')
    return result[0][0]

result = get_vertica_status()
logging.info(result)
logging.info('The above is the result value')

执行 DAG

  1. 在 Apache Airflow 中,点击 Trigger DAG 按钮并检查状态。
  2. 如果状态显示成功,在 log 文件夹中验证日志以查看结果。logs 文件夹位于 airflow 文件夹中。

已知限制

注意: Apache Airflow 使用 Python 执行任务。以下限制是由于使用 Python 程序导致的结果。

通用限制

  • 对于所有数据类型,Null 显示为 None。

读取模式

  • BINARY、VARBINARY、LONG VARBINARY:值无法正确显示。
  • INTERVAL 和 TIMETZ:每个值会附加 b
  • DECIMAL、TIME、TIMESTAMP、TIMESTAMTZ 和 UUID:对应的函数名称会附加到值后面。

写入模式

  • DECIMAL、TIME、TIMESTAMP、TIMESTAMTZ 和 UUID:由于函数名称被附加到值后面,这些值无法写入 Vertica。
  • BINARY、VARBINARY、LONG VARBINARY 和 TIMETZ:不支持。
  • CHAR、VARCHAR 和 LONG VARCHAR:将 Null 替换为 EMPTY 以将值写入 Vertica。

原文来源:https://www.vertica.com/kb/Apache_Airflow_CG/Content/Partner/Apache_Airflow_CG.htm