跳转至

Vertica 集成 Amazon Kinesis:技术探索

概述

本技术探索详细描述了使用 Amazon Kinesis 将实时流数据加载到 Vertica 的过程。我们使用 Python 程序持续生成数据并将数据加载到 Amazon Kinesis 数据流中。Amazon Kinesis Data Firehose 将实时流数据传送到 AWS S3 存储桶。然后使用 AWS Glue 将数据从 S3 存储桶传输到 Vertica 进行进一步分析。

流程概览: 数据源 → Python 生成器 → Amazon Kinesis 数据流 → Kinesis Data Firehose → S3 存储桶 → AWS Glue → Vertica

Amazon Kinesis 概述

Amazon Kinesis 是一个实时大数据流服务,具有持久性和高可扩展性。可以轻松地实时收集、处理和分析流数据,并从多个来源将数据推送到数据流中。Amazon Kinesis 可以从多个来源捕获大量数据。

Amazon Kinesis Data Firehose 是一个托管服务,用于将流数据加载到数据存储或数据湖中。

测试环境

  • AWS 环境:标准 AWS 配置
  • Vertica 服务器:Vertica 11.0(使用 Community Edition 安装在 AWS 上)

前提条件

安装和配置 AWS CLI

在本地机器上安装 AWS CLI:

curl "https://s3.amazonaws.com/aws-cli/awscli-bundle.zip" -o "awscli-bundle.zip"
unzip awscli-bundle.zip
sudo ./awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws

安装后配置 AWS CLI:

$ aws configure

系统将提示输入 AWS access key id、region、secret key 和默认输出选项(本示例中使用 JSON 格式)。

其他前提条件

  • Vertica 必须在 AWS 上可用
  • 必须拥有使用 Amazon Kinesis 数据流、Amazon Kinesis Data Firehose、S3 存储桶和 AWS Glue 所需的 IAM 权限

连接 Amazon Kinesis 到 Vertica

步骤 1:创建 Kinesis 数据流

在 Amazon Kinesis 上创建新的数据流。记下数据流名称。

参考:https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-streams.html#how-do-i-create-a-stream

步骤 2:生成示例数据

使用以下 Python 程序持续生成示例数据并推送到 AWS 数据流:

import json
from pprint import pprint
import random
import time
import boto3
from faker import Faker

fake = Faker()
STREAM_NAME = 'TestDataStreamVertica'

def employee_record():
    rand_name = fake.name()
    rand_age = str(random.randint(25, 70))
    rand_address = fake.street_address()
    rand_city = fake.city()
    rand_state = fake.country_code()
    temp_record = rand_name+"|" + rand_age + "|" + rand_address + "|" + rand_city + "|" + rand_state
    return {'Data': json.dumps(temp_record), 'PartitionKey': 'partition_key'}

def generate(stream_name, batch_size, kinesis_client):
    while True:
        records = [employee_record() for _ in range(batch_size)]
        pprint(records)
        kinesis_client.put_records(StreamName=stream_name, Records=records)
        time.sleep(0.1)

if __name__ == "__main__":
    generate(
        stream_name=STREAM_NAME,
        batch_size=1,
        kinesis_client=boto3.client('kinesis'))

将代码中的 STREAM_NAME 更新为 Kinesis 数据流名称。在 Python IDE 中运行代码,这将持续向 Amazon Kinesis 数据流推送数据。

步骤 3:创建 Kinesis Data Firehose

创建 Amazon Kinesis Data Firehose 传输流,将上面创建的 Amazon Kinesis 数据流添加为源,将 Amazon S3 添加为目标。

  • 创建传输流:https://docs.aws.amazon.com/firehose/latest/dev/create-name.html
  • 使用 AWS S3 Console 创建 S3 存储桶:https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html

步骤 4:加载数据到 Vertica

确保数据已加载到 S3 存储桶后,使用 AWS Glue 将数据从 S3 存储桶加载到 Vertica。

关于如何将数据从 S3 存储桶加载到 Vertica 的更多信息,请参阅 AWS Glue Connection Guide。


原文来源:https://www.vertica.com/kb/Amazon_Kinesis_TE/Content/Partner/Amazon_Kinesis_TE.htm