跳转至

Vertica 与 Coiled:技术探索

关于本文档

本文档的目的是使用 Dask 支持的云环境 Coiled 与 Vertica 配合,对大型数据集进行读取、写入和分析。Coiled 管理 Dask 计算集群。

Coiled 和 Dask 概述

Coiled 帮助使用 Python 构建分布式和可扩展的应用程序。它通过在云中提供计算能力,帮助数据科学家使用 Python 解决涉及大型数据集的问题。

Dask 是一个开源框架,为现有 Python 栈提供并行化能力。Dask 提供与 Python 库(如 NumPy Arrays、Pandas DataFrames 和 scikit-learn)的集成。

测试环境

  • Coiled 0.0.67
  • RHEL 8.3
  • Vertica 11.0.0
  • Vertica 驱动:Vertica_python 1.0.5

将 Coiled 连接到 Vertica

安装 Coiled

  1. 安装 pip:sudo yum install python-pip
  2. 安装 Coiled:pip install coiled dask distributed --upgrade
  3. 访问 https://coiled.io/ 并点击 Start My Free Trial
  4. 使用 Google 或 Github 账户登录
  5. 在 Coiled 账户中生成 API token 并保存

截图

  1. 检查版本:coiled --version
  2. 登录:coiled login,提供 token

启动 Coiled 集群

import coiled
cluster = coiled.Cluster(n_workers=10)

from dask.distributed import Client
client = Client(cluster)
print('Dashboard:', client.dashboard_link)

从 Vertica 读取数据到 Dask Dataframe

import coiled
from dask.distributed import Client
from dask_vertica import read_vertica

coiled.create_software_environment(
    name="vertica",
    pip=["bokeh>=2.1.1", "pandas", "git+https://github.com/coiled/dask-vertica.git"],
    force_rebuild=True
)

cluster = coiled.Cluster(
    name="vertica-test",
    n_workers=4,
    software="vertica",
)
client = Client(cluster)

connection_kwargs = {
    "user": "coiled_user",
    "password": "",
    "host": "<host>",
    "port": 5433,
    "database": "PartPub80DB"
}
ddf = read_vertica(connection_kwargs, 'iris', 5, 'coiled_schema')
ddf.groupby("Species").PetalLengthCm.mean().compute()

从 Dask Dataframe 写入数据到 Vertica

要将数据写入 Vertica,请在 IPython 会话中执行以下代码。以下代码生成一个虚拟时间序列数据集并将其写入 Vertica 中的一个表。以下代码包含两种场景:写入分区数较少和分区数较多的数据集。

请将以下代码片段替换为您要写入的模式名称和表名称:

from datetime import datetime
import coiled
from dask.distributed import Client
from dask_vertica import to_vertica
import vertica_python
from verticapy.connect import *
from verticapy.utilities import *
from verticapy.learn.metrics import *
from verticapy.learn.mlplot import *
from verticapy.toolbox import *
from verticapy.errors import *
from verticapy.learn.vmodel import *

import logging
from distributed.client import Client
import dask.dataframe as dd
from dask_vertica.core import (
    daskdf_to_vertica, to_vertica,
    _drop_table, _check_if_exists
)
from dask import compute
from verticapy import readSQL
from verticapy.datasets import load_titanic, load_iris

coiled.create_software_environment(
    name="vertica",
    pip=["bokeh>=2.1.1", "pandas", 
         "git+https://github.com/coiled/dask-vertica.git", 
         "verticapy", "vertica-python"],
    force_rebuild=True
)

cluster = coiled.Cluster(
    name="vertica-test",
    n_workers=4,
    software="vertica",
)
client = Client(cluster)

vdb = {
    'host': '<host>',
    'port': 5433,
    'user': 'dbadmin',
    'password': '',
    'database': 'VMart',
    'connection_load_balance': True,
    'session_label': 'py',
    'unicode_error': 'strict'
}

schema = "Machine_Learning_Test"
name = "test_02"

# 读取 iris 表
with vertica_python.connect(**vdb) as conn:
    with conn.cursor() as cur:
        iris = load_iris(cur, schema=schema)

ddf = dd.from_pandas(iris.to_pandas(), npartitions=3)

# 写入小数据框(3个分区)
to_vertica(ddf, vdb, "iris_async", schema=schema, if_exists="overwrite")

# 写入大数据框(更多分区)
demo_ts = dd.demo.make_timeseries(
    start="2000-01-01",
    end="2000-12-31",
    freq="30s",
    partition_freq="1W"
)
to_vertica(demo_ts, vdb, "demo_ts", schema=schema, if_exists="overwrite")

更多信息

  • Coiled 网站
  • Coiled 文档
  • Dask Dataframe
  • Vertica Community Edition
  • Vertica User Community
  • Vertica Documentation

原文来源:https://www.vertica.com/kb/Coiled_TE/Content/Partner/Coiled_TE.htm