跳转至

End-To-End 数据转换指南:使用 dbt-vertica Adapter 进行数据分析

本视频介绍了使用开源 dbt-vertica-adapter 集成 dbt 和 Vertica 的方案。观看YouTube视频以了解如何直接在 Vertica 中快速转换原始数据,并利用 adapter 中的新功能。

dbt(Data Build Tool)是一款数据转换工具,可将原始数据高效地转换为经过清洗、易于查询并可供分析的精选数据集。在 dbt 中,数据转换和业务逻辑通过 SQL 语句和 Jinja 语言来表达。Jinja 将 dbt 变成了一个 SQL 编程环境。

您可以使用 dbt 和 OpenText Vertica 构建强大且高效的数据转换管道。dbt 代码在数据库内部执行,充分利用 Vertica 的高性能。此外,dbt 提供了软件开发最佳实践,如版本控制、内置测试框架、文档和代码可重用性,使 SQL 开发人员能够像软件工程师一样工作。

Vertica 的 dbt adapter 实现了从 dbt 到 Vertica 的连接。dbt-vertica 是一个开源项目,可在 Vertica 的 GitHub 仓库中获取。它使用 Python 和 yml 编写,并采用 vertica-python 驱动程序。

Vertica + dbt 数据栈

以下是 dbt 和 Vertica 数据栈的示意图。dbt 转换已经加载到 Vertica 中的数据,并使用 dbt-vertica adapter 连接到您的数据库。与传统数据转换工具不同,dbt 的数据转换直接在您的数据库中执行。没有任何数据会移出 Vertica。转换后的数据可供 BI 工具和其他技术用于进一步分析。

Dbt 图片 17

解决方案概述

本文档提供了一个使用 VMart 的数据转换管道示例,并包含配置环境以及使用 dbt-vertica adapter 连接 Vertica 的步骤。

本文档介绍的解决方案分为以下章节:

  • 适用范围
  • 环境
  • 配置环境以将 dbt 与 Vertica 结合使用
  • 配置 dbt Profile 以连接到 Vertica
  • 使用 VMart 的数据转换管道示例
  • 总结与结论

在阅读本指南时,您可以展开和折叠主题以查看更详细的说明。

适用范围

本文档要求并假定读者熟悉 dbt-core、Vertica、SQL、Jinja 和 VS Code。

本文档不涉及以下内容:

  • Vertica 和 VMart 数据库的安装。
  • 数据加载:将原始数据加载到 Vertica 的过程。
  • 数据可视化:展示转换后数据的步骤。
  • dbt Cloud。

环境

以下是我们在本文档中执行示例所使用的环境:

说明: 安装 dbt-vertica adapter 时会自动安装 dbt-core 和 vertica-python。

Vertica 环境:

  • Vertica Analytic Database 23.3.0,安装在 3 节点 Linux VM 集群上。
  • VMart 示例数据库

Windows 上的 dbt 环境:

  • Visual Studio Code(VS Code)版本 1.76.2
  • git 版本 2.40.0
  • Python 版本 3.11.2
  • dbt-vertica adapter 版本 1.5.0
  • vertica-python 驱动程序版本 1.3.1
  • dbt-core 版本 1.5.0

可连接到数据库的 SQL 客户端:

  • 在我们的测试中,我们使用了 DBVisualizer 版本 13.0.4 以用户友好的方式检查表或视图中的数据。

配置环境以将 dbt 与 Vertica 结合使用

本节说明如何安装 dbt-vertica adapter 并初始化 dbt 项目。该配置基于 Windows 环境。

前置条件

在开始之前,请确保已安装以下内容:

  • Python
  • git

说明: 建议使用 git 为您的 dbt 项目引入源代码控制。这是一项最佳实践。请确保将 git.exe 的路径添加到用户的 PATH 环境变量中。

  • Visual Studio Code(VS Code)
  • VS Code Python 扩展

安装 dbt-vertica adapter

按照以下步骤在 Windows 上安装 adapter:

  1. 打开 VS Code。
  2. 在屏幕顶部的 Terminal 菜单中,单击 New Terminal
  3. 命令行界面(CLI)将打开。
  4. 创建并激活 Python 虚拟环境。

说明: 我们建议您在此隔离环境中安装 adapter 并创建 dbt 项目。

  1. 导航到您想要创建虚拟环境的文件夹,然后创建并激活您的环境:

Dbt 图片 2

  1. 运行 pip install dbt-vertica 来安装 adapter:

说明: 此操作将安装 dbt-core 和 vertica-python,您无需单独安装它们。

Dbt 图片 3

日志应以已安装的软件包列表结尾,如下所示:

Dbt 图片 4

初始化您的 dbt 项目

按照以下步骤初始化将托管您的转换模型的 dbt 项目:

  1. 创建一个文件夹来托管 dbt 项目,然后运行 dbt init 命令在此位置创建项目。在 CLI 中输入所请求的信息:

Dbt 图片 5

说明: 此操作将创建一个包含所有必要空文件夹的骨架项目。它还会创建可编辑以连接到 Vertica 的 profile 文件。dbt 项目文件 dbt_project.yml 包含项目配置。

配置 dbt Profile 以连接到 Vertica

本节说明如何配置到 Vertica 的连接,也称为 dbt profile。当您按照上一节所述初始化项目时,dbt profiles 文件会自动生成。dbt profiles 文件位于以下目录:C:\Users\<my_user>\.dbt\profiles.yml

指定到 Vertica 的连接

以下是 profiles 文件 profiles.yml 的示例。您可以编辑 profiles 文件并添加多个由唯一名称标识的连接。您在项目中使用的连接在项目配置文件 dbt_project.yml 中指定。

Dbt 图片 6

测试到 Vertica 的连接

执行 dbt debug 命令来测试您到 Vertica 的连接。此命令应在 Python 虚拟环境已激活且位于 dbt 项目文件夹内的情况下运行。

说明: dbt debug 检查 Python 和所有 .yml 文件是否已正确配置,以及 dbt 是否能够使用 profiles.yml 文件中提供的连接信息连接到 Vertica。

Dbt 图片 7

当所有测试都通过后,您就可以开始构建您的 dbt 模型了。

使用 VMart 的数据转换管道示例

以下解决方案包含一个数据转换管道,其目标是业务分析和报表。我们想要计算每个供应商的准时交货率数量准确率完美订单率。我们将此最终模型称为 vendor_performance。我们将展示 vendor_performance 模型如何通过一系列从原始数据开始的转换步骤来创建。

项目 Lineage 图

以下是该解决方案的 Lineage 图。它展示了从源数据到最终模型的数据流:

  • 绿色节点是我们在该项目中使用的源数据/原始数据。
  • 第二层节点是引用源数据并对原始数据进行轻微转换的 staging 模型。
  • 第三层节点是引用 staging 模型并应用业务逻辑进行转换的模型。此处是一个维度表和一个事实表。用户可以使用 BI 工具查询这些模型。
  • 最后一个节点是最终模型,它同时使用事实表和维度表,并计算用于报表的供应商 KPI。

Dbt 图片 8

项目组织

项目通过以下文件夹结构进行组织:

  • staging:包含引用原始数据表并对其进行轻微转换的模型。
  • marts > vmart:包含引用 staging 模型并应用业务逻辑以进一步转换它们的模型。
  • reports:包含最终用户可能查询用于分析的最终模型。

Dbt 图片 9

指定源数据

源数据是我们在项目中处理的原始数据。该项目的源文件是 models > staging > vmart > src_vmart.yml。此文件记录并指定以下原始表的测试:raw_schema.statesraw_schema.vendorsraw_schema.store_orders

Dbt 图片 16

说明: 您可以使用 Vertica 的内置功能或各种辅助技术将源数据导入数据库。源数据以及 DDL 文件位于 demo_dbt_vmart/sampledata 目录中。本项目使用的原始数据已加载到 Vertica 中。

models > staging > vmart > src_vmart.yml

version: 2

sources:
  - name: raw_data
    description: This is the raw data to transform.
    database: partner12db
    schema: raw_schema
    tables:
      - name: states
        description: This is the raw states data.
        columns:
          - name: statecode
            description: The primary key for the raw states data.
            tests:
              - unique
              - not_null

      - name: vendors
        description: This is the raw vendors data.
        columns:
          - name: vendorid
            description: The primary key for the raw vendors data.
            tests:
              - unique
              - not_null

      - name: store_orders
        description: This is the raw store_orders data.
        columns:
          - name: ordernumber
            description: The primary key for the raw store_orders data.
            tests:
              - unique
              - not_null

以下是每个原始表的数据示例。图片仅显示前 10 行数据:

raw_schema.states:

Dbt 图片 10

raw_schema.vendors:

Dbt 图片 11

raw_schema.store_orders:

Dbt 图片 12

创建模型

dbt 模型是我们为了转换数据而在 SQL 文件中编写的 select 语句。

Staging 模型

以下 staging 模型标准化了原始表中的列名,位于 models > staging > vmart 目录中。

说明: 这些模型以视图(view)的形式构建。

models > staging > vmart > stg_vendors.sql

该模型创建了 dbt_schema.stg_vendors 视图。它引用了原始数据表 vendors

with source as (
  select * from {{ source('raw_data', 'vendors')
),

staged as (
  select
    vendorid as vendor_key,
    vendorname as vendor_name,
    vendoraddress as vendor_address,
    vendorcity as vendor_city,
    vendorstate as vendor_state,
    lastdealupdate as last_deal_update,
    loadedattimestamp as loaded_at_timestamp
  from source
)

select * from staged

models > staging > vmart > stg_states.sql

该模型创建了 dbt_schema.stg_states 视图。它引用了原始数据表 states

with source as (
  select * from {{ source('raw_data', 'states')
),

staged as (
  select
    statename as state_name,
    statecode as state_code,
    region,
    division
  from source
)

select * from staged

models > staging > vmart > stg_store_orders.sql

该模型创建了 dbt_schema.stg_store_orders 视图。它引用了原始数据表 store_orders

with source as (
  select * from {{ source('raw_data', 'store_orders')
),

staged as (
  select
    productid as product_key,
    productversion as product_version,
    storeid as store_key,
    vendorid as vendor_key,
    employeeid as employee_key,
    ordernumber as order_number,
    dateordered as date_ordered,
    dateshipped as date_shipped,
    expecteddeliverydate as expected_delivery_date,
    datedelivered as date_delivered,
    qtyordered as quantity_ordered,
    qtydelivered as quantity_delivered,
    shippername as shipper_name,
    unitprice as unit_price,
    shippingcost as shipping_cost,
    qtyinstock as quantity_in_stock,
    reorderlevel as reorder_level,
    overstockceiling as overstock_ceiling,
    loadedattimestamp as loaded_at_timestamp
  from source
)

select * from staged

Marts 模型

以下 marts 模型应用业务逻辑来转换 staging 模型中的数据。它们位于 models > marts 目录中。

说明: 这些模型以表(table)的形式构建。

models > marts > dim_vendors.sql

该模型创建了维度表 dbt_schema.dim_vendors。它引用了 staging 表 dbt_schema.stg_vendorsdbt_schema.stg_states,并通过连接 staging 表将 vendor_region 列引入到 dim_vendors 表中。

{{ config (
  materialized = 'table'
  )
}}

with vendors as (
  select * from {{ ref('stg_vendors')
),

states as (
  select * from {{ ref('stg_states')
),

joined as (
  select
    vendor_key,
    vendor_name,
    vendor_address,
    vendor_city,
    vendor_state,
    states.region as vendor_region,
    last_deal_update,
    loaded_at_timestamp
  from
    vendors inner join states on
    vendors.vendor_state = states.state_code
),

final as (
  select * from joined
)

select * from final

models > marts > fct_store_orders.sql

该模型创建了事实表 dbt_schema.fct_store_orders。它引用了 staging 表 dbt_schema.stg_store_orders,并应用业务逻辑来计算以下指标:

Dbt 图片 13

它还根据 date_ordered 列按年份对数据进行分区。Vertica 为每个唯一的 date_ordered 年份创建一个分区键。

说明: 在 Vertica 数据库中对大型事实表进行分区是提高查询性能的良好实践。

/*
 创建表 fct_store_orders 并按订单日期年份分区数据
 Vertica 为每个唯一的 date_ordered 年份创建一个分区键。
*/
{{ config(
  materialized = 'table',
  partition_by_string = 'YEAR(date_ordered)'
  )
}}

with store_orders as (
  select * from {{ ref('stg_store_orders')
),

fact as (
  select
    product_key,
    product_version,
    store_key,
    vendor_key,
    employee_key,
    order_number,
    date_ordered,
    date_shipped,
    expected_delivery_date,
    date_delivered,
    (date_delivered - date_ordered) as days_to_deliver,
    quantity_ordered,
    quantity_delivered,
    shipper_name,
    unit_price,
    shipping_cost,
    ((quantity_delivered * unit_price) + shipping_cost) as total_order_cost,
    quantity_in_stock,
    reorder_level,
    overstock_ceiling,
    loaded_at_timestamp,
    1 as order_count,
    case when (quantity_delivered = quantity_ordered)
    then 1 else 0 end as quantity_accuracy_flag,
    case when (date_delivered <= expected_delivery_date)
    then 1 else 0 end as on_time_delivery_flag,
    case when (quantity_delivered = quantity_ordered) and
    (date_delivered <= expected_delivery_date)
    then 1 else 0 end as perfect_order_flag
  from
    store_orders
),

final as (
  select * from fact
)

select * from final

Report 模型

管道中的最终模型称为 vendor_performance,位于 models > reports 目录中。该模型创建了报表表 dbt_schema.vendor_performance。它引用了事实表 dbt_schema.fct_store_orders 和维度表 dbt_schema.dim_vendors,并应用业务逻辑来计算以下指标:

Dbt 图片 14

此外,表和投影数据按哈希值分段,并按 vendor_key 列排序。

说明: 此模型以表的形式构建。

models > reports > vendor_performance.sql

/*
 表和投影数据按 vendor_key 列进行哈希分段,
 并在同一列上排序。
 数据均匀分布到所有集群节点。
*/
{{ config(
  materialized = 'table',
  order_by = 'vendor_key',
  segmented_by_string = 'vendor_key'
  )
}}

with vendors as (
  select * from {{ ref('dim_vendors')
),

store_orders as (
  select * from {{ ref('fct_store_orders')
),

joined as (
  select
    store_orders.vendor_key as "vendor_key",
    vendors.vendor_name as "Vendor Name",
    vendors.vendor_state as "Vendor State",
    vendors.vendor_region as "Vendor Region",
    avg(quantity_ordered) as "Avg Quantity Ordered",
    avg(quantity_delivered) as "Avg Quantity Delivered",
    avg(days_to_deliver) as "Avg Days to Deliver",
    avg(shipping_cost) as "Avg Shipping Cost",
    avg(total_order_cost) as "Avg Order Cost",
    sum(quantity_accuracy_flag) / count(order_count) as "Quantity Accuracy Rate",
    sum(on_time_delivery_flag) / count(order_count) as "On-time Delivery Rate",
    sum(perfect_order_flag) / count(order_count) as "Perfect Order Rate"
  from
    store_orders inner join vendors on
    vendors.vendor_key = store_orders.vendor_key
  group by
    1, 2, 3, 4
),

final as (
  select * from joined
)

select * from final

转换为增量模型

我们在上一节中创建的模型在需要时可以转换为增量模型。增量模型仅处理新增或更新的数据,而不是在每次运行时重建整个表。增量模型比表物化(table materialization)性能更好,特别是在处理大型表时。

我们在 models > staging > vmart 目录中额外创建了两个模型:stg_store_orders_merge.sqlstg_store_orders_insert_overwrite.sql。这些模型使用到达原始表 store_orders 的新数据来增量更新 store_orders 信息。

使用 Merge 策略转换为增量模型

此模型使用增量策略 merge,并创建表 dbt_schema.stg_store_orders_merge

在此示例中:

  • 目标表为 dbt_schema.stg_store_orders_merge,源表为 raw_schema.store_orders
  • 它使用主键 order_number 将源表 store_orders 中新增或更改的行与目标表 stg_store_orders_merge 中的行进行匹配。
  • 它使用 merge_update_columns 配置参数指定我们关心更新的列。这些列是 date_deliveredquantity_deliveredshipping_cost
  • 我们希望更新自上次运行模型以来所有新增或更改的行;此条件在 where 子句中指定。

models > staging > vmart > stg_store_orders_merge.sql

{{ config(
  materialized = 'incremental',
  incremental_strategy = 'merge',
  unique_key = 'order_number',
  merge_update_columns = ['date_delivered', 'quantity_delivered', 'shipping_cost']
  )
}}

with new_store_orders as (
  select * from {{ source('raw_data', 'store_orders')
  {% if is_incremental() %}
  where loadedattimestamp >= ( select max(loaded_at_timestamp) from {{this}} )
  {% endif %}
),

updated_store_orders as (
  select
    productid as product_key,
    productversion as product_version,
    storeid as store_key,
    vendorid as vendor_key,
    employeeid as employee_key,
    ordernumber as order_number,
    dateordered as date_ordered,
    dateshipped as date_shipped,
    expecteddeliverydate as expected_delivery_date,
    datedelivered as date_delivered,
    qtyordered as quantity_ordered,
    qtydelivered as quantity_delivered,
    shippername as shipper_name,
    unitprice as unit_price,
    shippingcost as shipping_cost,
    qtyinstock as quantity_in_stock,
    reorderlevel as reorder_level,
    overstockceiling as overstock_ceiling,
    loadedattimestamp as loaded_at_timestamp
  from new_store_orders
)

select * from updated_store_orders

底层原理:

以下是 dbt 执行并在 Vertica 中运行的一系列步骤:

  1. 比较目标表与源表,选择自上次运行模型以来列值发生更改或新增的所有行。dbt 创建一个临时表,其中包含将要更新或插入的行。
  2. 执行一个 merge 语句,使用主键 order_number 在行存在时更新,或在行不存在时插入。
MERGE
INTO
  "partner12db"."dbt_schema"."stg_store_orders_merge" AS DBT_INTERNAL_DEST
USING
  "stg_store_orders_merge__dbt_tmp" AS DBT_INTERNAL_SOURCE
ON
  DBT_INTERNAL_DEST."order_number" = DBT_INTERNAL_SOURCE."order_number"
WHEN MATCHED
  THEN
UPDATE
SET
  "date_delivered" = DBT_INTERNAL_SOURCE."date_delivered",
  "quantity_delivered" = DBT_INTERNAL_SOURCE."quantity_delivered",
  "shipping_cost" = DBT_INTERNAL_SOURCE."shipping_cost"
WHEN NOT MATCHED
  THEN
INSERT
  (
    "product_key",
    "product_version",
    "store_key",
    "vendor_key",
    "employee_key",
    "order_number",
    "date_ordered",
    "date_shipped",
    "expected_delivery_date",
    "date_delivered",
    "quantity_ordered",
    "quantity_delivered",
    "shipper_name",
    "unit_price",
    "shipping_cost",
    "quantity_in_stock",
    "reorder_level",
    "overstock_ceiling",
    "loaded_at_timestamp"
  )
VALUES
  (
    DBT_INTERNAL_SOURCE."product_key",
    DBT_INTERNAL_SOURCE."product_version",
    DBT_INTERNAL_SOURCE."store_key",
    DBT_INTERNAL_SOURCE."vendor_key",
    DBT_INTERNAL_SOURCE."employee_key",
    DBT_INTERNAL_SOURCE."order_number",
    DBT_INTERNAL_SOURCE."date_ordered",
    DBT_INTERNAL_SOURCE."date_shipped",
    DBT_INTERNAL_SOURCE."expected_delivery_date",
    DBT_INTERNAL_SOURCE."date_delivered",
    DBT_INTERNAL_SOURCE."quantity_ordered",
    DBT_INTERNAL_SOURCE."quantity_delivered",
    DBT_INTERNAL_SOURCE."shipper_name",
    DBT_INTERNAL_SOURCE."unit_price",
    DBT_INTERNAL_SOURCE."shipping_cost",
    DBT_INTERNAL_SOURCE."quantity_in_stock",
    DBT_INTERNAL_SOURCE."reorder_level",
    DBT_INTERNAL_SOURCE."overstock_ceiling",
    DBT_INTERNAL_SOURCE."loaded_at_timestamp"
  )

使用 insert_overwrite 策略转换为增量模型

此模型使用增量策略 insert_overwrite,并创建表 dbt_schema.stg_store_orders_insert_overwrite

insert_overwrite 策略基于分区删除并插入目标表中的行。partition_by_string 配置参数是一个用于将目标表划分为分区的表达式。partitions 配置参数指定将从目标表中删除并从源表中插入的分区。

在此示例中:

  • 目标表为 dbt_schema.stg_store_orders_insert_overwrite,源表为 raw_schema.store_orders
  • 目标表按 date_ordered 列分为年度分区。
  • partitions 配置参数中指定的分区与 where 子句中指定的分区相同。
  • 假设当前年份为 2023,我们希望替换 2023 和 2022 年的分区。

重要说明: partitions 配置参数是可选的。如果在 where 子句之外还指定了 partitions 配置参数,可能会出现以下两种情况: - 目标表最终可能会缺少源表中的某些分区。如果 partitions 配置参数中指定的某些分区未在 where 子句中指定,就会出现这种情况。目标表中所有在 partitions 配置参数中指定的分区都将被删除,但并非所有分区都会从源表中重新插入。 - 目标表最终可能会出现重复行。如果 where 子句中指定的某些分区未在 partitions 配置参数中指定,就会出现这种情况。配置参数中的所有分区都将从目标表中删除,但 where 子句中指定的源表中的额外分区将被插入到目标表中。

models > staging > vmart > stg_store_orders_insert_overwrite.sql

{{ config(
  materialized = 'incremental',
  incremental_strategy = 'insert_overwrite',
  partition_by_string = 'year(date_ordered)',
  partitions = ['2022', '2023']
  )
}}

with new_store_orders as (
  select * from {{ source('raw_data', 'store_orders')
  {% if is_incremental() %}
  where YEAR(dateordered) >= YEAR(now())-1
  {% endif %}
),

updated_store_orders as (
  select
    productid as product_key,
    productversion as product_version,
    storeid as store_key,
    vendorid as vendor_key,
    employeeid as employee_key,
    ordernumber as order_number,
    dateordered as date_ordered,
    dateshipped as date_shipped,
    expecteddeliverydate as expected_delivery_date,
    datedelivered as date_delivered,
    qtyordered as quantity_ordered,
    qtydelivered as quantity_delivered,
    shippername as shipper_name,
    unitprice as unit_price,
    shippingcost as shipping_cost,
    qtyinstock as quantity_in_stock,
    reorderlevel as reorder_level,
    overstockceiling as overstock_ceiling,
    loadedattimestamp as loaded_at_timestamp
  from new_store_orders
)

select * from updated_store_orders

底层原理:

模型首次运行时,表会根据配置参数 partition_by_string 中指定的表达式进行分区。在此示例中,目标表 dbt_schema.stg_store_orders_insert_overwrite 使用源表 raw_schema.store_orders 中的数据创建,并根据 date_ordered 列按年份分区。

模型首次运行时执行的 DDL:

create table
  "partner12db"."dbt_schema"."stg_store_orders_insert_overwrite"
INCLUDE SCHEMA PRIVILEGES as (
  with new_store_orders as (
    select * from "partner12db"."raw_schema"."store_orders"
  ),
  updated_store_orders as (
    select
      productid as product_key,
      productversion as product_version,
      storeid as store_key,
      vendorid as vendor_key,
      employeeid as employee_key,
      ordernumber as order_number,
      dateordered as date_ordered,
      dateshipped as date_shipped,
      expecteddeliverydate as expected_delivery_date,
      datedelivered as date_delivered,
      qtyordered as quantity_ordered,
      qtydelivered as quantity_delivered,
      shippername as shipper_name,
      unitprice as unit_price,
      shippingcost as shipping_cost,
      qtyinstock as quantity_in_stock,
      reorderlevel as reorder_level,
      overstockceiling as overstock_ceiling
    from new_store_orders
  )
  select * from updated_store_orders
);
alter table "partner12db"."dbt_schema"."stg_store_orders_insert_overwrite" partition by year(date_ordered);

在后续运行中:

  • dbt 使用 DROP_PARTITIONS 语句删除目标表中 where 子句指定的每个分区的所有行。在此示例中,分区对应于 ordered_date 中年份大于等于当前年份减 1 的所有年份。这意味着如果当前年份是 2023,则要删除的分区是 2023 和 2022。
  • dbt 使用 INSERT 语句将选定的行从源表插入到目标表。从源表选定的行是对应于 where 子句中指定分区的行。在此示例中,插入到目标表中的行是 2023 和 2022 年份的行。

以下是后续运行执行的 DDL 和 DML:

SELECT PARTITION_TABLE('dbt_schema.stg_store_orders_insert_overwrite');

SELECT DROP_PARTITIONS('dbt_schema.stg_store_orders_insert_overwrite', '2022', '2022');
SELECT PURGE_PARTITION('dbt_schema.stg_store_orders_insert_overwrite', '2022');

SELECT DROP_PARTITIONS('dbt_schema.stg_store_orders_insert_overwrite', '2023', '2023');
SELECT PURGE_PARTITION('dbt_schema.stg_store_orders_insert_overwrite', '2023');

INSERT INTO "partner12db"."dbt_schema"."stg_store_orders_insert_overwrite"
  ("product_key", "product_version", "store_key",
   "vendor_key", "employee_key", "order_number",
   "date_ordered", "date_shipped", "expected_delivery_date",
   "date_delivered", "quantity_ordered", "quantity_delivered",
   "shipper_name", "unit_price", "shipping_cost", "quantity_in_stock",
   "reorder_level", "overstock_ceiling", "loaded_at_timestamp")
(
  SELECT "product_key", "product_version", "store_key",
         "vendor_key", "employee_key", "order_number", "date_ordered",
         "date_shipped", "expected_delivery_date", "date_delivered",
         "quantity_ordered", "quantity_delivered", "shipper_name",
         "unit_price", "shipping_cost", "quantity_in_stock",
         "reorder_level", "overstock_ceiling", "loaded_at_timestamp"
  FROM "stg_store_orders_insert_overwrite__dbt_tmp"
);

模型的文档和测试

模型的文档和测试在以下 .yml 文件中指定:

models > staging > vmart > stg_vmart.yml

此文件描述了 staging 模型并测试了主键。

version: 2

models:
  - name: stg_states
    description: This is the staged states table.
    columns:
      - name: state_code
        description: The primary key of the stg_states table.
        tests:
          - not_null
          - unique

  - name: stg_vendors
    description: This is the staged vendors table.
    columns:
      - name: vendor_key
        description: The primary key of the stg_vendors table.
        tests:
          - not_null
          - unique

  - name: stg_store_orders
    description: This is the staged store_orders table.
    columns:
      - name: order_number
        description: The primary key of the stg_store_orders table.
        tests:
          - not_null
          - unique

models > marts > marts.yml

此文件描述了 marts 模型中的指标并测试了主键。

version: 2

models:
  - name: dim_vendors
    description: "This is the vendors dimension table."
    columns:
      - name: vendor_key
        description: The primary key of the vendors dimension table.
        tests:
          - not_null
          - unique

  - name: fct_store_orders
    description: "This is the store orders fact table."
    columns:
      - name: order_number
        description: The primary key of the store orders fact table.
        tests:
          - not_null
          - unique

      - name: days_to_deliver
        description: "Number of days it takes to deliver an order. This is the date of delivery minus the date of the order. Example: days_to_deliver = (date_delivered - date_ordered)"

      - name: total_order_cost
        description: "Total cost of a particular order. This is quantity delivered multiplied by the unit price plus the cost of shipping. Example: total_order_cost = (qtydelivered * unitprice) + shippingcost"

      - name: order_count
        description: "Order count has the value of 1. This column facilitates the calculation of different measures for reporting such as quantity_accuracy_rate and on_time_delivery_rate."

      - name: quantity_accuracy_flag
        description: "The quantity accuracy flag indicates whether the order was delivered with the correct quantity. This is the quantity_delivered field equaling the quantity_ordered field. An example of this calculation is: IF (quantity_delivered = quantity_ordered) THEN (1) ELSE (0) END."

      - name: on_time_delivery_flag
        description: "The on-time delivery flag indicates whether the order was delivered on the expected delivery date or sooner. This is the date_delivered is less than or equal to the expected_delivery_date. An example of this calculation is: IF (date_delivered <= expected_delivery_date) THEN (1) ELSE (0) END."

      - name: perfect_order_flag
        description: "The perfect order flag indicates whether the order was delivered without incidents. This is both quantity accuracy and on-time delivery. An example of this calculation is: IF (date_delivered <= expected_delivery_date) AND (quantity_delivered = quantity_ordered) THEN (1) ELSE (0) END."

models > reports > reports.yml

此文件描述了 reports 模型中的指标并测试了主键。

version: 2

models:
  - name: vendor_performance
    description: "This table is used by end-users for visualization purposes and contains information about vendor performance KPIs."
    columns:
      - name: vendor_key
        description: The primary key of the vendor_performance table.
        tests:
          - not_null
          - unique

      - name: quantity_accuracy_rate
        description: "The quantity accuracy rate is the percentage of orders that were delivered with the correct quantity. quantity_accuracy_rate = TOTAL(quantity_accuracy_flag) / TOTAL(order_count)"

      - name: on_time_delivery_rate
        description: "The on-time delivery rate is the percentage of orders that were delivered on the expected delivery date or sooner. on_time_delivery_rate = TOTAL(on_time_delivery_flag) / TOTAL(order_count)"

      - name: perfect_order_rate
        description: "The perfect order rate is the percentage of orders that were delivered without incidents. perfect_order_rate = TOTAL(perfect_order_flag) / TOTAL(order_count)"

运行与测试解决方案

要构建该解决方案,请执行 dbt build 命令。此命令将运行并测试所有上游模型,包括最后一个模型。它还会对 src_vmart.yml 中指定的源数据执行测试。

生成文档和 Lineage 图

要生成 Lineage 图并查看项目管道的依赖关系,请执行:

dbt docs generate

然后执行:

dbt docs serve

在文档网站中,单击 DAG 图标:

Dbt 图片 15

Dbt Dbtstructure

可视化转换后的数据

您可以使用任何偏好的工具连接到 Vertica 并可视化 dbt 生成的表数据。在此示例中,我们使用了 OpenText 的 Magellan 创建了一个供应商绩效仪表板:

Dbt 图片 18

总结与结论

我们创建了一个全面的数据转换管道示例,该示例使用业务逻辑将原始数据转换为维度表和事实表,用于业务分析和报表。

Vertica + dbt 构成了一个强大的数据库内数据转换解决方案,充分利用了您数据库的速度优势。

更多信息