Vertica 集成 Amazon Kinesis:技术探索¶
概述¶
本技术探索详细描述了使用 Amazon Kinesis 将实时流数据加载到 Vertica 的过程。我们使用 Python 程序持续生成数据并将数据加载到 Amazon Kinesis 数据流中。Amazon Kinesis Data Firehose 将实时流数据传送到 AWS S3 存储桶。然后使用 AWS Glue 将数据从 S3 存储桶传输到 Vertica 进行进一步分析。
流程概览: 数据源 → Python 生成器 → Amazon Kinesis 数据流 → Kinesis Data Firehose → S3 存储桶 → AWS Glue → Vertica
Amazon Kinesis 概述¶
Amazon Kinesis 是一个实时大数据流服务,具有持久性和高可扩展性。可以轻松地实时收集、处理和分析流数据,并从多个来源将数据推送到数据流中。Amazon Kinesis 可以从多个来源捕获大量数据。
Amazon Kinesis Data Firehose 是一个托管服务,用于将流数据加载到数据存储或数据湖中。
测试环境¶
- AWS 环境:标准 AWS 配置
- Vertica 服务器:Vertica 11.0(使用 Community Edition 安装在 AWS 上)
前提条件¶
安装和配置 AWS CLI¶
在本地机器上安装 AWS CLI:
curl "https://s3.amazonaws.com/aws-cli/awscli-bundle.zip" -o "awscli-bundle.zip"
unzip awscli-bundle.zip
sudo ./awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws
安装后配置 AWS CLI:
系统将提示输入 AWS access key id、region、secret key 和默认输出选项(本示例中使用 JSON 格式)。
其他前提条件¶
- Vertica 必须在 AWS 上可用
- 必须拥有使用 Amazon Kinesis 数据流、Amazon Kinesis Data Firehose、S3 存储桶和 AWS Glue 所需的 IAM 权限
连接 Amazon Kinesis 到 Vertica¶
步骤 1:创建 Kinesis 数据流¶
在 Amazon Kinesis 上创建新的数据流。记下数据流名称。
参考:https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-streams.html#how-do-i-create-a-stream
步骤 2:生成示例数据¶
使用以下 Python 程序持续生成示例数据并推送到 AWS 数据流:
import json
from pprint import pprint
import random
import time
import boto3
from faker import Faker
fake = Faker()
STREAM_NAME = 'TestDataStreamVertica'
def employee_record():
rand_name = fake.name()
rand_age = str(random.randint(25, 70))
rand_address = fake.street_address()
rand_city = fake.city()
rand_state = fake.country_code()
temp_record = rand_name+"|" + rand_age + "|" + rand_address + "|" + rand_city + "|" + rand_state
return {'Data': json.dumps(temp_record), 'PartitionKey': 'partition_key'}
def generate(stream_name, batch_size, kinesis_client):
while True:
records = [employee_record() for _ in range(batch_size)]
pprint(records)
kinesis_client.put_records(StreamName=stream_name, Records=records)
time.sleep(0.1)
if __name__ == "__main__":
generate(
stream_name=STREAM_NAME,
batch_size=1,
kinesis_client=boto3.client('kinesis'))
将代码中的 STREAM_NAME 更新为 Kinesis 数据流名称。在 Python IDE 中运行代码,这将持续向 Amazon Kinesis 数据流推送数据。
步骤 3:创建 Kinesis Data Firehose¶
创建 Amazon Kinesis Data Firehose 传输流,将上面创建的 Amazon Kinesis 数据流添加为源,将 Amazon S3 添加为目标。
- 创建传输流:https://docs.aws.amazon.com/firehose/latest/dev/create-name.html
- 使用 AWS S3 Console 创建 S3 存储桶:https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html
步骤 4:加载数据到 Vertica¶
确保数据已加载到 S3 存储桶后,使用 AWS Glue 将数据从 S3 存储桶加载到 Vertica。
关于如何将数据从 S3 存储桶加载到 Vertica 的更多信息,请参阅 AWS Glue Connection Guide。
原文来源:https://www.vertica.com/kb/Amazon_Kinesis_TE/Content/Partner/Amazon_Kinesis_TE.htm