跳转至

Vertica 集成 PipelineDB

适用于 Vertica 7.2.x 及更早版本

关于本文档

Vertica 提供 SQL 和 SQL on Hadoop 功能。PipelineDB 对流数据连续执行 SQL 查询。

本文档说明如何将 PipelineDB 的 SQL 流处理能力与 Vertica 强大的大规模并行处理(MPP)能力相结合。

本文档中的示例展示了如何使用 PipelineDB 对流数据执行 SQL 查询,然后将这些流持久化到 Vertica 中以执行高级分析。

测试版本:Vertica 7.2.x + PipelineDB 0.9.5。

PipelineDB 概述

PipelineDB 是一个开源 SQL 数据库,对数据流执行 SQL 查询。PipelineDB 增量存储查询结果到表中。您可以选择将数据库表的结果增量导出到 Vertica,然后对表中的数据执行复杂的分析。

PipelineDB 和 Kafka

Apache Kafka 是一个开源应用程序,专为流式使用场景(高数据量、低延迟)而设计。PipelineDB 可以生产和消费 Kafka 的消息并将其加载到数据流中。

安装 PipelineDB

  1. 创建 pipelinedb 用户来运行 PipelineDB 服务器:
    $ adduser pipelinedb
    
  2. https://www.pipelinedb.com/download 下载适合操作系统的 .rpm 文件。
  3. 安装下载的 rpm:
    $ rpm -ivh pipelinedb-0.9.3-1.x86_64.rpm
    

更多安装详情请参阅 http://docs.pipelinedb.com/installation.html#rpm

初始化 PipelineDB 数据目录

安装 PipelineDB 后,需要初始化数据库目录——PipelineDB 存储与数据库相关的所有文件和数据的目录:

$ pipeline-init -D pipelinedb_datadir/

启动 PipelineDB

启动 PipelineDB 服务器在后台运行:

$ pipeline-ctl -D pipelinedb_datadir -l pipelinedb.log start

-l 选项指定日志文件的路径。停止运行中的服务器:

$ pipeline-ctl -D pipelinedb_datadir stop

测试 PipelineDB 服务器是否在运行:

$ psql -h localhost -p 5432 pipeline

在 PipelineDB 中配置 Kafka 生产者和消费者

安装 librdkafka

$ git clone https://github.com/edenhill/librdkafka
$ cd librdkafka
$ ./configure --disable-{ssl,sasl}
$ make
$ make install
$ make clean

安装 pipeline_kafka 扩展

$ git clone https://github.com/pipelinedb/pipeline_kafka
$ cd pipeline_kafka
$ ./configure
$ make
$ make install
$ make clean

配置 pipeline_kafka

  1. pipelinedb.conf 文件中添加以下行:
    Shared_preload_libraries = pipeline_kafka
    
  2. 加载扩展到 PipelineDB:
    => CREATE EXTENSION pipeline_kafka;
    

创建扩展后,PipelineDB 可以开始生产和消费 Kafka 消息,也可以向 Vertica 发送消息。

配置 Kafka

http://kafka.apache.org/downloads.html 下载 .tgz 文件并安装在 Linux 机器上。测试中使用的是 kafka_9.0.1.tgz。

在 Kafka 可以开始向 PipelineDB 流式传输数据之前,需要启动 Kafka 和 ZooKeeper 服务器,并创建 PipelineDB 要监控的主题。

从 Kafka 到 PipelineDB 再到 Vertica 的事件流示例

以下示例展示了事件如何从 Kafka 流向 PipelineDB 进行流处理,然后流向 Vertica 进行分析查询或对历史数据建模。

需要安装的额外工具: - nginx:HTTP 接口,用于将 URL 请求元数据记录到 Kafka - kafkacat:将文件中的新记录写入 Kafka - siege:并发向随机 URL 发送 HTTP 请求的工具

配置生产者

  1. 启动 Kafka 服务器:
    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    $ bin/kafka-server-start.sh config/server.properties
    
  2. Kafka 生产者将 JSON 序列化的 nginx 日志写入 Kafka。创建最小配置的 nginx 服务器:
    $ cat <<EOF > nginx.conf
    worker_processes 4;
    pid $PWD/nginx.pid;
    events {}
    http {
      log_format json
        '{'
          '"ts": "\$time_iso8601", '
          '"user_agent": "\$http_user_agent", '
          '"url": "\$request_uri", '
          '"latency": "\$request_time", '
          '"user": "\$arg_user"'
        '}';
      access_log $PWD/access.log json;
      error_log $PWD/error.log;
      server {
        location ~ ^/ {
          return 200;
        }
      }
    }
    EOF
    
  3. 启动 nginx 服务请求:
    $ nginx -c $PWD/nginx.conf
    
  4. 使用 kafkacat 将 access.log 的新行转译为 Kafka 消息:
    $ tail -f access.log | kafkacat -b localhost:9092 -t logs_topic
    
  5. 生成 URL 列表供 siege 随机选择:
    $ for x in {0..1000000}; do
        echo "http://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt
      done
    
  6. 运行 siege 模拟并发请求:
    $ siege -c32 -b -d0 -f urls.txt
    

将事件重定向到 Vertica

事件可以流向 PipelineDB,然后重定向到 Vertica;或者 Vertica 直接从主流消费。以下步骤展示 PipelineDB 如何分流:

  1. 创建 Kafka 扩展:
    pipelinedb=> CREATE EXTENSION pipeline_kafka;
    
  2. 设置 Kafka broker:
    pipelinedb=> SELECT pipeline_kafka.add_broker('<kafkahost>:9092');
    
  3. 创建接收 siege 工具生成事件的流(Kafka 正在发布这些事件):
    pipelinedb=> CREATE STREAM logs_stream (payload json);
    

创建连续视图

不能直接查询流,因此创建连续视图来跟踪进入流的事件:

  1. 创建视图:
    pipelinedb=> CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;
    
  2. 将流与要消费的主题关联:
    pipelinedb=> SELECT pipeline_kafka.consume_begin('logs_topic', 'logs_stream');
    

分流

PipelineDB 可以将传入流分成两个流——一个供自己消费,另一个发布回 Kafka 供 Vertica 通过其 KafkaLoader 消费:

  1. 创建两个流:
    pipelinedb=> CREATE STREAM pipelinedb_stream (payload json);
    pipelinedb=> CREATE STREAM vertica_stream (payload json);
    
  2. 创建连续转换来分流:
    pipelinedb=> CREATE CONTINUOUS TRANSFORM t AS SELECT payload FROM logs_stream
        THEN EXECUTE PROCEDURE pipeline_stream_insert('pipelinedb_stream','vertica_stream');
    
  3. 在分流后的流上创建视图以跟踪事件:
    pipelinedb=> CREATE CONTINUOUS VIEW pipelinedb_message_count
        AS SELECT COUNT(*) FROM pipelinedb_stream;
    pipelinedb=> CREATE CONTINUOUS VIEW vertica_message_count
        AS SELECT COUNT(*) FROM vertica_stream;
    
  4. 创建将事件发布到 Kafka 供 Vertica 消费的连续转换:
    pipelinedb=> CREATE CONTINUOUS TRANSFORM vertica_publish AS SELECT payload
        FROM vertica_stream
        THEN EXECUTE PROCEDURE pipeline_kafka.emit_tuple('kafkapush');
    

此时,Vertica 消费的事件可在 kafkapush 主题中获取。

在 Vertica 中配置消费者

配置 Vertica 中的 Kafka loader 以消费 PipelineDB 发布的事件:

  1. 添加指向要消费主题的 Kafka broker 的调度器:
    $ /opt/vertica/packages/kafka/bin/vkconfig scheduler --add --password password --brokers <broker_host>:9092
    
  2. 在 Vertica 中创建 Flex Table 存储事件:
    vsql=> CREATE FLEX TABLE public.kafka_tgt();
    
  3. 添加 Kafka 主题到 Vertica 调度器:
    $ /opt/vertica/packages/kafka/bin/vkconfig topic --add --target public.kafka_tgt --rejection-table public.kafka_rej --topic kafkapush --password password
    
  4. 启动调度器开始消费 Kafka 消息:
    $ /opt/vertica/packages/kafka/bin/vkconfig launch --password password
    
  5. 查询 flex table 查看加载的事件:
    vsql=> SELECT * FROM public.kafka_tgt;
    
  6. 确认 /etc/hosts 文件中存在 Kafka broker 主机名的条目。

更多 Vertica Kafka 配置详情,请参阅 Integrating with Apache Kafka

在 PipelineDB 中为 Vertica 配置 Foreign Data Wrapper

通过创建 foreign data wrapper,可以从 PipelineDB 查询 Vertica 表:

  1. 下载 pipeline_odbc 扩展源代码:https://github.com/pipelinedb/pipeline_odbc
  2. 安装:
    $ make
    $ make install
    
  3. 下载并创建 Vertica ODBC 驱动,然后为 Vertica 实例创建 ODBC DSN。
  4. 在 PipelineDB 中创建 ODBC 扩展:
    pipeline=# CREATE EXTENSION odbc_fdw;
    
  5. 使用 odbc_fdw 指向 Vertica 集群:
    pipeline=# CREATE SERVER vertica FOREIGN DATA WRAPPER odbc_fdw OPTIONS (dsn 'Vertica');
    
  6. 通过 Vertica foreign server 将 Vertica 表映射到 PipelineDB 表:
    pipeline=# CREATE FOREIGN TABLE vertica_test (x integer, y integer, z integer)
        SERVER vertica OPTIONS (database 'vertica', table 'vertica_test');
    
  7. 将 PipelineDB 用户映射到 Vertica 用户:
    pipeline=# CREATE USER MAPPING FOR <pipelinedb_user> SERVER vertica
        OPTIONS (username '<vertica_user>', password '<vertica_password>');
    

现在可以像操作 PipelineDB 本地关系一样与 Vertica 表交互:

pipeline=# SELECT * FROM vertica_test;
 x | y | z
---+---+---
 0 | 0 | 0

更多信息

关于... 请参阅...
PipelineDB https://github.com/pipelinedb/pipeline_odbc
Apache Kafka http://kafka.apache.org/
Vertica 社区版 https://vertica.com/community/
Vertica 文档 http://vertica.com/docs/latest/HTML/index.htm

原文来源:https://www.vertica.com/kb/HPE-Vertica-Integration-with-PipelineDB-Connection-Guide/Content/Partner/HPE-Vertica-Integration-with-PipelineDB-Connection-Guide.htm