跳转至

Vertica 消费分隔符格式 Kafka 数据最佳实践

概述

Vertica 支持通过 Kafka 集成导入分隔符格式(如 CSV、TSV 等)的数据。本文介绍使用 Vertica 消费分隔符格式 Kafka 数据的最佳实践,涵盖消息格式检查、手动 COPY、记录终止符处理、KafkaInsertDelimiters 过滤器以及调度器微批次配置。

使用 kafkacat 检查 Kafka 消息格式

在配置 Vertica 消费 Kafka 数据之前,首先使用 kafkacat 检查 Kafka 消息的格式,确保消息结构与目标表匹配。

# 消费消息并显示详细信息
kafkacat -C -b kafka-broker:9092 \
  -t topic_name \
  -o beginning \
  -c 5 \
  -f 'Topic %t [%p] offset %o: key=%k, payload=%S%s\n'

# 查看原始消息内容(十六进制显示,便于检查分隔符)
kafkacat -C -b kafka-broker:9092 \
  -t topic_name \
  -o beginning \
  -c 3 \
  -X output.format=hex

# 以分隔符格式解码消息
kafkacat -C -b kafka-broker:9092 \
  -t topic_name \
  -o beginning \
  -c 5 \
  -q | sed 's/|/\n  |/g'

使用 kafkacat 可以确认以下关键信息:

  • 消息使用哪种分隔符(逗号、制表符、管道符等)
  • 是否包含标题行
  • 是否包含引号或转义字符
  • 记录终止符的类型(换行符、自定义终止符)
  • 消息大小分布(用于优化批量大小)

手动 COPY 分隔符格式 Kafka 数据

在创建调度任务之前,建议先使用手动 COPY 命令验证 Kafka 数据可以正确加载到 Vertica 表中。

-- 创建目标表
CREATE TABLE IF NOT EXISTS sales_data (
    transaction_id INTEGER,
    product_id INTEGER,
    customer_id INTEGER,
    sale_date DATE,
    amount NUMERIC(10,2),
    region VARCHAR(50)
);

-- 手动 COPY Kafka 数据(管道符分隔)
COPY sales_data
  FROM VERTICA_KAFKA WITH (
    'kafka_broker' = 'kafka-broker:9092',
    'topic' = 'sales_topic',
    'consumer_group' = 'vertica_consumer'
  )
  DELIMITER '|'
  DIRECT;

-- 检查加载结果
SELECT COUNT(*) FROM sales_data;

参数说明

参数 说明
kafka_broker Kafka 代理地址
topic Kafka 主题名称
consumer_group 消费者组 ID
DELIMITER 字段分隔符
DIRECT 直接加载模式(跳过 ROS)
NO COMMIT 不自动提交(用于测试)
ABORT ON ERROR 遇错中止
REJECTMAX N 允许的最大拒绝行数

记录终止符与 KafkaInsertDelimiters 过滤器

Kafka 消息可能包含多行数据,或使用自定义记录终止符。Vertica 提供 KafkaInsertDelimiters 过滤器来处理这类场景。

记录终止符选项

终止符 表示方式 说明
换行符 \\n 默认记录终止符
回车换行 \\r\\n Windows 风格
制表符 \\t 字段或记录分隔
自定义字符 十六进制(如 \\x01 非打印字符

使用 KafkaInsertDelimiters 过滤器

当 Kafka 消息中的记录使用了不同于默认 \\n 的终止符时,使用 KafkaInsertDelimiters 过滤器将 Kafka 消息内部的终止符转换为 Vertica 可识别的格式。

-- 创建带有 KafkaInsertDelimiters 的 COPY 语句
COPY sales_data
  FROM VERTICA_KAFKA WITH (
    'kafka_broker' = 'kafka-broker:9092',
    'topic' = 'sales_topic',
    'consumer_group' = 'vertica_consumer'
  )
  DELIMITER '|'
  FILTER KafkaInsertDelimiters(
    record_terminator = E'\\n',
    line_delimiter = E'\\x01'
  )
  DIRECT;

过滤器参数

参数 说明
record_terminator Kafka 消息内部的记录终止符
line_delimiter 整条 Kafka 消息的分隔符(默认与 record_terminator 相同)
record_terminator_position 终止符位置(endbeginning

使用 DELIMITED 解析器配置调度器微批次

在生产环境中,通过 Vertica 调度器(vkconfig)创建微批次任务持续消费 Kafka 数据。

创建加载规范(Load Spec)

# 创建调度器加载规范
vkconfig load-spec --create \
  --load-spec kafka_delimited_spec \
  --parser DELIMITED \
  --add-filter KafkaInsertDelimiters \
  --add-delimiter '|'

完整配置示例

# 步骤 1:创建调度器集群
vkconfig cluster --create \
  --cluster kafka_cluster \
  --hosts kafka-broker:9092

# 步骤 2:创建调度器
vkconfig scheduler --create \
  --scheduler kafka_scheduler \
  --cluster kafka_cluster \
  --config-db-schema public

# 步骤 3:创建加载规范
vkconfig load-spec --create \
  --load-spec kafka_delimited_spec \
  --parser DELIMITED \
  --add-filter KafkaInsertDelimiters \
  --config KafkaInsertDelimiters/record_terminator=$'\\n' \
  --delimiter '|'

# 步骤 4:创建目标表规范
vkconfig target-table --create \
  --target-schema public \
  --target-table sales_data \
  --load-spec kafka_delimited_spec

# 步骤 5:创建微批次任务
vkconfig microbatch --create \
  --microbatch sales_microbatch \
  --scheduler kafka_scheduler \
  --cluster kafka_cluster \
  --target-table sales_microbatch_db.public.sales_data \
  --add-source sales_topic

验证调度器配置

通过查询 stream_load_specs 表验证加载规范的配置:

SELECT
    spec_name,
    spec_parser,
    spec_filters,
    spec_delimiter,
    spec_parameters
FROM
    v_catalog.stream_load_specs
WHERE
    spec_name = 'kafka_delimited_spec';

查询结果示例

列名
spec_name kafka_delimited_spec
spec_parser DELIMITED
spec_filters KafkaInsertDelimiters
spec_delimiter |
spec_parameters record_terminator=\n

监控与排查

查看微批次状态

-- 检查微批次运行状态
SELECT
    microbatch_id,
    microbatch_schema,
    microbatch_table,
    source_name,
    partition_start,
    partition_end,
    status,
    error_count
FROM
    v_catalog.microbatches
ORDER BY
    microbatch_id DESC
LIMIT 20;

查看加载错误

-- 查看被拒绝的记录
SELECT
    rejected_data,
    rejected_reason,
    rejected_time
FROM
    v_monitor.rejected_data
WHERE
    rejected_time > NOW() - INTERVAL '1 hour'
LIMIT 100;

常见问题

问题 原因 解决方法
数据错列 分隔符不匹配 检查 Kafka 消息中的实际分隔符
记录丢失 记录终止符配置错误 使用 kafkacat 的十六进制输出检查终止符
性能下降 批大小不合适 调整 MicrobatchSize 参数
重复数据 消费者组提交失败 检查 Kafka 消费者组偏移量

资源

扩展阅读