跳转至

Vertica 与 Apache Storm

关于本文档

本次技术探索的目标是将 Vertica 与 Apache Storm 集成,实现将 Kafka 流数据摄入 Vertica 数据库。

Apache Storm 概述

Apache Storm 是一个开源的分布式实时计算系统。可用于机器学习、实时分析、ETL 等。Apache Storm 可以与数据库和队列服务集成。从 Apache Storm 网站 下载。

测试环境

  • 操作系统:Windows Server 2019
  • Java JDK 8
  • Vertica Server 版本 12.0.3
  • Vertica JDBC Driver 版本 12.0.3
  • IDE:IntelliJ IDEA Community Edition
  • Apache Storm 2.4.0
  • Kafka 2.12

Apache Storm 模式

Apache Storm 支持两种模式: - 本地模式(Local) - 生产模式(Production)

本次探索使用了本地模式。

创建 Apache Storm 项目

  1. 打开 IntelliJ Community Edition,创建新项目:Language = Java,Build System = Maven,选择 JDK 1.8。
  2. 打开 pom.xml 文件添加所需依赖。
  3. 导入 Vertica JDBC Driver。
  4. 在 main 方法中创建 KafkaSpoutConfig。
  5. 创建 Spout 对象、Bolt 对象并创建 Topology。
  6. 创建另一个类作为 Bolt,继承 BaseBasicBolt。
  7. declareOutputFields 方法中声明 schema。
  8. execute 方法中,创建 Vertica JDBC 连接,编写数据插入 Vertica 的逻辑。
  9. 启动 Kafka producer。
  10. 运行 Apache Storm 程序。数据应插入到 Vertica 表中。

在 IntelliJ 中添加 Vertica JDBC JAR

  1. 在 IntelliJ 中创建项目后,打开 File > Project Structure
  2. 选择 Modules > Dependencies
  3. 点击 + 图标,选择 JAR or directories,选择 Vertica JDBC Jar。Vertica JAR 应显示在 Imported libraries 中。

项目完整代码

POM 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="https://maven.apache.org/POM/4.0.0
         https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>KafkaJDBC</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>2.4.0</version>
            <scope>compile</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>2.4.0</version>
            <scope>compile</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>
    </dependencies>
</project>

Storm Kafka Bolt

package org.example;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class KBolt extends BaseBasicBolt {
    private OutputCollector collector;
    private BasicOutputCollector basicOutputCollector;
    private Connection connection;
    private PreparedStatement statement;

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        this.basicOutputCollector = basicOutputCollector;
        try {
            Class.forName("com.vertica.jdbc.Driver");
            connection = DriverManager.getConnection(
                "jdbc:vertica://<VerticaServerIP>:5433/<DBName>?user=<Uname>&password=<DBPassword>");
            // id 是列名
            statement = connection.prepareStatement(
                "INSERT INTO <TableName> (id) VALUES (?)");

            // 从 Kafka tuple 中提取数据并填充 JDBC statement
            String id = tuple.getValueByField("value").toString();
            statement.setString(1, id);
            System.out.println("Printing the message from Kafka: " + id);

            // 执行 JDBC statement 并确认 tuple
            statement.executeUpdate();
            basicOutputCollector.emit(new Values(id));
        } catch (Exception e) {
            collector.fail(tuple);
            throw new RuntimeException("Failed to execute JDBC statement", e);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("id"));
    }
}

Storm Kafka Spout Topology

package org.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import java.util.Properties;

public class MainKSprout1 {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        Properties prop = new Properties();
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
        KafkaSpoutConfig spoutConfig = KafkaSpoutConfig
            .builder("PLAINTEXT://<KafkaProducerIP>:9092", "<KafkaTopic>")
            .setProp(prop)
            .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
            .setOffsetCommitPeriodMs(100)
            .setProp("session.timeout.ms", 20000)
            .setProp("heartbeat.interval.ms", 15000)
            .build();

        builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));
        builder.setBolt("MultiplierBolt", new KBolt())
               .shuffleGrouping("stations");
        Config config = new Config();
        config.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("HelloTopology", config, builder.createTopology());
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

更多信息


原文来源:https://www.vertica.com/kb/Apache_Storm_TE/Content/Partner/Apache_Storm_TE.htm