Vertica 与 Apache Storm¶
关于本文档¶
本次技术探索的目标是将 Vertica 与 Apache Storm 集成,实现将 Kafka 流数据摄入 Vertica 数据库。
Apache Storm 概述¶
Apache Storm 是一个开源的分布式实时计算系统。可用于机器学习、实时分析、ETL 等。Apache Storm 可以与数据库和队列服务集成。从 Apache Storm 网站 下载。
测试环境¶
- 操作系统:Windows Server 2019
- Java JDK 8
- Vertica Server 版本 12.0.3
- Vertica JDBC Driver 版本 12.0.3
- IDE:IntelliJ IDEA Community Edition
- Apache Storm 2.4.0
- Kafka 2.12
Apache Storm 模式¶
Apache Storm 支持两种模式: - 本地模式(Local) - 生产模式(Production)
本次探索使用了本地模式。
创建 Apache Storm 项目¶
- 打开 IntelliJ Community Edition,创建新项目:Language = Java,Build System = Maven,选择 JDK 1.8。
- 打开
pom.xml文件添加所需依赖。 - 导入 Vertica JDBC Driver。
- 在 main 方法中创建 KafkaSpoutConfig。
- 创建 Spout 对象、Bolt 对象并创建 Topology。
- 创建另一个类作为 Bolt,继承 BaseBasicBolt。
- 在
declareOutputFields方法中声明 schema。 - 在
execute方法中,创建 Vertica JDBC 连接,编写数据插入 Vertica 的逻辑。 - 启动 Kafka producer。
- 运行 Apache Storm 程序。数据应插入到 Vertica 表中。
在 IntelliJ 中添加 Vertica JDBC JAR¶
- 在 IntelliJ 中创建项目后,打开 File > Project Structure。
- 选择 Modules > Dependencies。
- 点击
+图标,选择 JAR or directories,选择 Vertica JDBC Jar。Vertica JAR 应显示在 Imported libraries 中。
项目完整代码¶
POM 文件¶
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>KafkaJDBC</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.4.0</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.4.0</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
</project>
Storm Kafka Bolt¶
package org.example;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class KBolt extends BaseBasicBolt {
private OutputCollector collector;
private BasicOutputCollector basicOutputCollector;
private Connection connection;
private PreparedStatement statement;
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
this.basicOutputCollector = basicOutputCollector;
try {
Class.forName("com.vertica.jdbc.Driver");
connection = DriverManager.getConnection(
"jdbc:vertica://<VerticaServerIP>:5433/<DBName>?user=<Uname>&password=<DBPassword>");
// id 是列名
statement = connection.prepareStatement(
"INSERT INTO <TableName> (id) VALUES (?)");
// 从 Kafka tuple 中提取数据并填充 JDBC statement
String id = tuple.getValueByField("value").toString();
statement.setString(1, id);
System.out.println("Printing the message from Kafka: " + id);
// 执行 JDBC statement 并确认 tuple
statement.executeUpdate();
basicOutputCollector.emit(new Values(id));
} catch (Exception e) {
collector.fail(tuple);
throw new RuntimeException("Failed to execute JDBC statement", e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("id"));
}
}
Storm Kafka Spout Topology¶
package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import java.util.Properties;
public class MainKSprout1 {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
Properties prop = new Properties();
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig
.builder("PLAINTEXT://<KafkaProducerIP>:9092", "<KafkaTopic>")
.setProp(prop)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.setOffsetCommitPeriodMs(100)
.setProp("session.timeout.ms", 20000)
.setProp("heartbeat.interval.ms", 15000)
.build();
builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));
builder.setBolt("MultiplierBolt", new KBolt())
.shuffleGrouping("stations");
Config config = new Config();
config.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HelloTopology", config, builder.createTopology());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
更多信息¶
原文来源:https://www.vertica.com/kb/Apache_Storm_TE/Content/Partner/Apache_Storm_TE.htm