MPP 并行执行引擎原理 —— SQL 如何变成跨节点的并行计算任务¶
作者:JiangChong | 发布时间:2026-06-10
适用场景: 当你阅读
EXPLAIN输出时看到GLOBAL RESEGMENT、BROADCAST、NetworkSend、ParallelUnion等关键字却不知道它们在做什么,或者你发现同一个查询在集群扩展后性能没有线性增长,需要理解执行引擎的底层并行机制时,本文提供完整的原理解析。
关联文章:
- MPP JOIN 策略全解析与优化器决策逻辑 — 优化器如何在 Local/Broadcast/Shuffle Join 之间做决策
- MPP 数据分布策略 — 从 Hash 分段到一致性哈希,理解数据在节点间的分布机制
- Vertica性能调优-1 如何阅读执行计划 — 执行计划文本和图形的详细解读
- Vertica性能调优-2 使用系统表排查查询故障 — 执行引擎性能分析的方法论
- Vertica Join 重分段倾斜诊断与修复 — Exchange 算子导致数据倾斜的实战诊断
理解全文脉络:
本文围绕「一个 SQL 如何被转化为并行执行的 DAG」这条主线展开。
- 第 1 节用一个具体查询驱动叙述,感受串行执行和并行执行的差距。
- 第 2 节逐层拆解执行计划的内部结构——算子树、Exchange 边界、线程并行度——这是理解全文的核心章节。
- 第 3 节分析 Vertica 执行引擎在每个关键设计点的选择及其 trade-off。如果你已经熟悉执行计划的基本概念,可以从第 3 节读起。
- 第 4 节讨论这些设计在实际使用中的表现。
- 第 5 节用案例验证。
第 1 节:问题背景 —— SQL 执行从「一个人干」到「一群人干」¶
1.1 串行执行的物理上限¶
想象你有一张 50 亿行的交易事实表 transactions,按日期分区、按 customer_id 哈希分布在 8 个节点上。现在需要跑这样一个查询:
SELECT customer_id, SUM(amount)
FROM transactions
WHERE trans_date BETWEEN '2025-01-01' AND '2025-06-30'
GROUP BY customer_id
ORDER BY SUM(amount) DESC
LIMIT 100;
如果这是一个单机数据库,执行流程很直白:磁盘扫描 → 过滤 → 哈希聚合 → 排序 → 取 Top 100。每个步骤串行执行,上一步完成才到下一步。50 亿行即使每行只花 1 微秒,也需要 5000 秒——超过 80 分钟。
单机瓶颈不仅在 CPU,更在 I/O。 即使是一块现代 NVMe 企业级 SSD(顺序读 3-7 GB/s),面对 50 亿行压缩后约 200 GB 的数据,光读完就需要 30-70 秒——还没算聚合和排序的 CPU 时间。如果是更旧的 SATA SSD 或 HDD,I/O 时间轻松上几百秒。单机的物理极限就是磁盘带宽和内存容量的上限,这不是优化器能解决的问题。
但你的 8 节点集群里,每节点只有约 6.25 亿行数据(25 GB)。如果每个节点同时扫描自己的那部分数据,I/O 时间就降到几秒——理论上。
1.2 并行执行的三个层次¶
问题不在于「并行的想法」,而在于「怎么并行」。 SQL 是一个声明式语言——你说「我要什么」,不说「怎么算」。把一句 SQL 翻译成能在 8 台机器上同时跑的并行计算任务,这是执行引擎的核心职责。
这个翻译过程涉及三个层次:
| 层次 | 问题 | 谁负责 |
|---|---|---|
| 跨节点并行 | 数据分布在 8 个节点上,查询怎么同时在所有节点上执行? | 优化器 + Exchange 算子 |
| 节点内并行 | 一个节点有多核 CPU,怎么把一个节点的数据再拆成多份同时算? | 执行引擎线程调度 |
| 流水线并行 | 扫描和聚合能同时进行吗?还是扫描完才开始聚合? | 执行引擎 pipeline 模型 |
这三个层次叠加在一起,才能把 80 分钟的串行查询压到几十秒。但三层并行同时存在,也带来了资源管理、数据重分布、同步屏障等复杂问题——这正是执行引擎要解决的。
1.3 主题界定¶
本文讨论的是 MPP 执行引擎的运行时机制——优化器产出的执行计划如何在集群中被实际执行。不讨论优化器如何生成执行计划(详见 MPP JOIN 策略全解析与优化器决策逻辑),也不讨论数据如何分布在节点上(详见 MPP 数据分布策略),而是聚焦于中间层:计划到执行的翻译过程。
第 2 节:核心概念与机制 —— 从一棵树到一个 DAG¶
2.1 执行计划的「树」结构¶
Vertica 优化器产出的执行计划是一棵算子树(operator tree)。每个节点是一个算子(operator),负责特定的数据处理逻辑。数据从叶子节点流向根节点,每经过一个算子就变换一次(C-Store 7 Years §6.1)。
一个最简单的单表聚合查询:
对应的算子树(来源:C-Store 7 Years §6.1, Figure 3):
Root
└─ ParallelUnion
└─ Filter (count(*) < 10)
└─ GroupBy (key: dept_id, agg: count(*))
└─ StorageUnion
└─ Scan (departments)
比喻: 这棵树就像一个工厂的流水线。Scan 是从仓库取原材料,GroupBy 是第一个加工站,Filter 是质检站,ParallelUnion 是把多条流水线汇合。数据像产品一样从上游流向下游。
Vertica 执行引擎(Execution Engine, EE)支持的算子类型(C-Store 7 Years §6.1):
| 算子 | 功能 | 关键算法选择 |
|---|---|---|
| Scan | 从 ROS 容器读数据,应用谓词过滤,利用编码数据直接运算 | SIP 过滤器推入 Scan |
| GroupBy | 分组聚合 | Hash 聚合 / 流水线聚合;预聚合 prepass |
| Join | 表连接 | Hash Join / Merge Join;支持 Semi/Anti Join |
| ExprEval | 表达式求值 | JIT 编译减少分支 |
| Sort | 排序 | 必要时外存化(spill to disk) |
| Analytic | SQL-99 窗口函数 | 分区内排序 + 窗口滑移 |
| Send/Recv | 跨节点数据传输 | Broadcast / Resegment;保留流的排序性 |
关于 prepass(预聚合):这是 Vertica GroupBy 算子独有的一种优化。优化器在 Scan 之后紧贴着一个 L1 缓存大小的小哈希表做第一轮就地聚合——数据刚从磁盘读上来、还在 CPU 缓存里,趁热打铁先聚合一波,把行数压下来再交给下游。但因为 prepass 本身有开销,如果运行时发现并没有实质减少行数(比如 GROUP BY 键基数极高,每行都不同),执行引擎会自动跳过它。详见 §3.5「运行时自适应」。
2.2 Exchange 算子:从「树」到「DAG」¶
在执行计划中,最关键的架构分界点是 Exchange 算子(Send/Recv 对)。 之所以叫 Exchange,是因为它表示数据在节点之间「交换」——离开一个节点的执行上下文,进入另一个节点的执行上下文。
在没有 Exchange 算子时,计划就是一棵纯树——所有数据流在一个节点内完成。一旦加入 Send/Recv,计划就不再是一棵树了:
- Send 算子(
NetworkSend)位于「发送端」节点上,它接收上游算子的输出,将数据序列化并通过网络发往目标节点 - Recv 算子(
NetworkRecv)位于「接收端」节点上,它从网络接收数据,反序列化后传给下游算子
听起来很简单,但 Send 内部隐含了一个关键操作:重分段(resegmentation)。Send 算子不是盲目地把数据发给随机节点——它根据目标分段表达式(通常是 JOIN 键或 GROUP BY 键)重新计算每行的哈希值,将哈希值相同的数据行发往同一个节点。这个操作保证了「相同键值的数据在同一个节点汇合」(Query Optimizer §III-A)。
(关于重分段与 Broadcasting 的选择逻辑,详见 MPP JOIN 策略全解析与优化器决策逻辑 第 3 节。)
Exchange 是 MPP 执行引擎区别于单机引擎的核心。 在单机数据库中,算子之间通过内存中的行批次传递数据。在 MPP 数据库中,算子之间可能需要跨越物理网络——这对性能的影响是数量级的差异。
2.3 Pipeline 并行 vs Data 并行¶
这是理解执行引擎并行机制的最关键的一对概念。
Pipeline 并行(流水线并行):不同算子在同一时刻处理不同批次的数据。就像汽车装配线——工位 1 焊接车架的同时,工位 2 在安装发动机,工位 3 在装座椅。上游算子产出一批行,立刻交给下游算子处理,自己同时处理下一批。
在 Vertica 中,pipeline 并行通过 Pull 模型 实现(C-Store 7 Years §6.1):最下游的算子向上一级算子请求行数据,这个请求沿着算子链向上传播,直到触发 Scan 去读磁盘。数据以「块」(block of rows)为单位在算子间传递,不是逐行传递——这是向量化执行(vectorized execution)的体现。
Data 并行(数据并行):同一个算子被复制多份,每份处理数据的不同子集。如果一张表在节点上有 4 个 ROS 容器,Scan 算子可能启动 4 个线程,每个线程独立扫描一个容器。这 4 个线程是同一个算子的不同实例——它们执行相同的代码,处理不同的数据。
在 Vertica 的执行计划中,Data 并行的入口是 StorageUnion 和 ParallelUnion 算子(C-Store 7 Years §6.1, Figure 3):
- StorageUnion:将节点的存储划分为不重叠的区域,为每个区域派发一个工作线程执行其下游算子链。这就是节点内 Data 并行的起点。
- ParallelUnion:将上游的多条数据流合并(不必保持排序),传递给下游算子。
比喻: Pipeline 并行是「流水线」——不同的工序同时进行。Data 并行是「多条流水线」——同一个工序开多条线,每条线处理一部分原材料。
2.4 执行计划的完整 DAG 结构¶
把 Pipeline 并行、Data 并行和 Exchange 算子组合在一起,一棵简单的算子树就演变成一个有向无环图(DAG = Directed Acyclic Graph)。同一个算子可能在不同节点上有独立的实例,通过 Send/Recv 连接。
DAG 三个词拆开理解:
有向(Directed):数据只能从上游算子流向下游算子,方向是单向的——Scan → Filter → Join → GroupBy,数据不会倒流。
无环(Acyclic):数据不会在算子间形成循环回路。如果有循环,查询就永远跑不完了。
图(Graph)而非树:逻辑算子树中每个节点只有一个父节点,但加入 Exchange 后结构变了。一个节点的 NetworkSend 可能把同一份数据同时发给其他所有节点的 NetworkRecv(如 Broadcast 场景),一个 Recv 的输入也可能来自多个节点的 Send。这时候一个节点有多条出边或多条入边,结构就从「树」变成了「图」。
所以「从 SQL 到 DAG」的意思是:一句声明式的 SQL,经过解析、优化、插入 Exchange 边界后,变成了一张跨节点、有执行依赖关系的有向无环图——每个节点是运行时的一个算子实例,边代表数据流动方向。DAG 是 MPP 并行执行的施工蓝图。
下面用一个三节点集群上的两表 JOIN + 聚合查询来展示这个演变过程:
SELECT t.customer_id, SUM(t.amount)
FROM transactions t
JOIN customers c ON t.customer_id = c.customer_id
WHERE c.region = 'EAST'
GROUP BY t.customer_id;
假设: transactions 表按 HASH(trans_id) 分段(与 JOIN 键不同),customers 表按 HASH(customer_id) 分段(与 JOIN 键相同)。
优化器产出的逻辑算子树:
GroupBy(key: customer_id, agg: SUM(amount))
└─ Join(cond: t.customer_id = c.customer_id, type: INNER)
├─ Scan(transactions) ← 分段键与 JOIN 键不匹配
└─ Scan(customers) ← 分段键与 JOIN 键匹配,且有 c.region 谓词
关键决策: transactions 表的分段键与 JOIN 键不匹配,无法做 Local Join。优化器必须在 transactions 表上插入 Exchange 算子——按 HASH(customer_id) 重分段后,customer_id 相同的数据才会汇集到同一节点。
在实际执行中,DAG 在三节点上展开:
节点 1(initiator):
GroupBy → 需要从所有节点收集结果
Recv ← 从节点 2、3 接收局部聚合结果
GroupBy(局部聚合,本地数据)
Join(condition: t.customer_id = c.customer_id)
Recv ← 从节点 2、3 接收重分段后的 transactions 数据
Scan(customers) ← 本节点持有的 customers 分段
节点 2:
GroupBy(局部聚合)
Join
Recv ← 从节点 1、3 接收重分段后的 transactions 数据
Scan(customers) ← 本节点持有的 customers 分段
节点 3:
GroupBy(局部聚合)
Join
Recv ← 从节点 1、2 接收重分段后的 transactions 数据
Scan(customers) ← 本节点持有的 customers 分段
每个节点上,Join 和 GroupBy 算子都在同时运行——这就是跨节点 Data 并行。 Exchange(Send/Recv)在节点间建立了一个全互联的数据传输网,确保 customer_id = 100 的 transactions 行全部汇聚到持有 customer_id = 100 的 customers 行的那个节点上。
2.5 内存管理:Zone 的概念¶
Pipeline 并行有一个隐藏的问题:所有算子同时运行,每个都需要内存。如果 GroupBy 需要 2GB、Join 需要 3GB、Sort 需要 1GB,三个同时在跑就需要 6GB。但实际可用内存可能只有 4GB。
Vertica 的解决方案是把计划分 Zone(C-Store 7 Years §6.1):一个 Zone 内的算子可以同时执行(共享资源),但不同 Zone 之间不能同时执行。上游 Zone 的算子完成后释放资源,下游 Zone 才能分配。这样就避免了「最坏情况下所有算子都需要满配内存」的悲观估算——每个 Zone 只需要管自己内部的资源竞争。
哪些位置会形成 Zone 边界? Exchange(Send/Recv)和需要全局屏障的算子(如 Sort、HashJoin 的 build 阶段)。在这些边界上,上游必须完全产出数据,下游才能开始处理。
2.6 执行阶段的时间线¶
查询提交后的完整生命周期可以通过 DC_QUERY_EXECUTIONS 系统表观测(来源:Vertica性能调优-2 使用系统表排查查询故障 §3.3):
| 阶段 | 发生位置 | 内容 |
|---|---|---|
| Plan | Initiator 节点 | 优化器生成执行计划 |
| InitPlan | Initiator 节点 | 初始化计划上下文 |
| SerializePlan | Initiator 节点 | 序列化计划,准备分发 |
| PreparePlan | 所有执行节点 | 反序列化、获取表锁、本地计划转换、EE 编译 |
| CompilePlan | 所有执行节点 | 预留资源(内存/文件句柄/线程)、EE 预执行初始化 |
| ExecutePlan | 所有执行节点 | 实际执行——这是占时最长的阶段 |
| AbandonPlan | Initiator 节点 | 释放资源、广播放弃消息 |
ExecutePlan 内部,不同路径(path_id)的算子可能并行执行,也可能串行执行,取决于计划中 Zone 的划分。path_id 是执行计划树中每个算子节点的标识——同一算子的多个并行实例(如多线程 Scan)共享同一个 path_id,而一条执行管线从叶子 Scan 到 Root 会跨越多个不同 path_id 的算子节点。理解这点对性能分析非常重要——EXECUTION_ENGINE_PROFILES 表中按 path_id 聚合可以看到每个算子节点的总耗时。
第 3 节:设计决策与 Trade-off¶
3.1 Pull 模型 vs Push 模型¶
在算子间传递数据有两种基本模型:
| 模型 | 原理 | 优势 | 劣势 |
|---|---|---|---|
| Push | 上游算子在产出行时主动推给下游 | 简单直观,适合流式处理 | 下游被动接受,无法控制速率 |
| Pull | 下游算子在需要行时主动向上游请求 | 下游控制节奏,天然支持背压 | 上游必须能响应请求,实现更复杂 |
Vertica 选择了 Pull 模型(C-Store 7 Years §6.1)。最下游的算子(通常是 Root)向上一级请求一个行批次,这个请求逐级向上传播,直到触发 Scan 读磁盘。数据以块为单位返回——每次几百到几千行。
Pull 模型的一个直接好处是内存控制:下游需要多少就请求多少,不会出现上游疯狂产出而下游来不及消费导致的缓冲区膨胀。代价是实现复杂度——每个算子都必须同时支持「被请求」和「请求上级」两套接口。
在 Vertica 的 EXPLAIN 图形输出中,Pull 模型的算子用矩形表示,Push 模型的算子用椭圆表示。 大部分算子(Scan、Join、GroupBy)都是 Pull 模型,少数内部组装算子使用 Push 模型。
3.2 向量化执行:行块 vs 单行¶
Vertica 采用向量化执行——算子之间以「行块」为单位传递数据,而非逐行传递(C-Store 7 Years §6.1)。这意味着每个算子的内部循环处理的是一批行,而不是单行。
向量化执行的优势:
- 减少函数调用开销:处理 1024 行只需一次算子调用,而非 1024 次
- CPU 缓存友好:连续处理一批数据,指令和数据都在缓存中
- 利于编译器优化:紧凑的循环体更容易被编译器自动向量化(SIMD)
代价是 Pipe 断裂:如果一个算子必须完成全部输入才能产出第一批输出(如 Sort、HashJoin 的 build 阶段),向量化优势就被打破了。这也是为什么执行引擎会在运行时动态切换算法——如果发现哈希表装不进内存,从 Hash Join 切换到 Sort-Merge Join。
3.3 编码感知执行 vs 解码后执行¶
Vertica 的算子被设计为能直接在编码数据上运行(C-Store 7 Years §6.1),而不是先把编码数据全部解码再计算。这是 Vertica 列存引擎的一个深层次优化。
为什么重要? 假设一个 RLE 编码的列只有 3 个不同值,物理上只存了 3 个「(值, 重复次数)」对。如果 Scan 算子能把 WHERE col = 'A' 直接应用到这 3 个对上,而不需要展开为完整行,处理量可以降低几个数量级(Materialization Strategies §V-B)。
但这也意味着算子代码必须分支——针对不同编码类型有不同的执行路径。这就是 JIT 编译发挥作用的地方:在编译阶段(EEcompile),执行引擎为每个表达式生成针对特定数据类型的机器码,避免运行时的分支判断(C-Store 7 Years §6.1)。
3.4 Exchange:Broadcast vs Resegment¶
这是优化器在生成执行计划时要做的一个关键决策。当两张表的分段键与 JOIN 键不匹配时,优化器需要决定如何搬运数据:
| 策略 | 原理 | 网络数据量 | 适用条件 |
|---|---|---|---|
| Broadcast | 将一张表完整复制到所有节点 | 表大小 × (节点数 - 1) | Inner 表远小于 Outer 表 |
| Resegment | 将一张或两张表按 JOIN 键重哈希分布 | 表大小(每行发送一次) | 两张表都很大 |
优化器用一个基于数据流的成本模型来做这个决策(Query Optimizer §IV-F)。它会分别估算 Broadcast 和 Resegment 两种方案的网络传输量,选择成本更低的方案。成本模型包含四个维度:Disk、CPU、Network、Memory——这就是 EXPLAIN LOCAL VERBOSE 输出中看到的 Disk(B)、CPU(B)、Memory(B)、Netwrk(B)。
如果统计信息缺失,优化器可能做出错误选择——比如把一张 5000 万行的表 Broadcast 到 64 个节点上,产生 5000万 × 63 = 31.5 亿行次的网络传输。这也是为什么 统计信息更新是 MPP 查询性能的基石(详见 Vertica 统计信息管理与查询性能)。
3.5 运行时自适应¶
计划不是刻在石头上的。Vertica 执行引擎有几项运行时自适应机制,在计划执行过程中根据实际数据流调整策略(C-Store 7 Years §6.1, Query Optimizer §IV-G):
- Join 算法切换:如果 Hash Join 的哈希表超过内存预算,则自动切换到 Sort-Merge Join(外存化)
- SIP 自适应禁用:如果 SIP 过滤器过滤掉的行的比例低于阈值(论文给出的参考值是 90% 的输入行通过了过滤器即视为无效),执行引擎会停止评估该 SIP 过滤器——因为评估开销大于收益(Materialization Strategies §IV-C)
- 预聚合(prepass)自适应:GroupBy 算子在 Scan 之后立即使用一个 L1 缓存大小的小哈希表做第一轮聚合。如果运行时发现这个 prepass 没有在实质减少行数,会自动跳过
- Merge→Union 转换:小块数据的 StorageMerge 如果开销大于合并本身,则自动转为 StorageUnion
这些自适应机制体现了一个重要的设计哲学:让运行时数据来驱动决策,而不是让优化器在信息不足时做死板的预先决定。
第 4 节:设计对实际使用的影响¶
4.1 查询维度¶
正确理解执行计划的时间分布。 EXECUTION_ENGINE_PROFILES 中的 execution time (us) 是算子的 CPU 时间(不含等待),clock time (us) 是墙钟时间(含等待)。如果 clock time 远大于 execution time,说明算子在等待上游输入或下游消费——这时 consumer stall 或 producer stall 计数器会很高。瓶颈不在这个算子本身,而在它的上下游。
Exchange 是性能的头号杀手。 在网络收发算子(NetworkSend/NetworkRecv)上花费的时间超过 Scan 时间是不正常的。如果你在系统表中看到 RESEGMENTED_MANY_ROWS 事件,说明大量数据在跨节点重分段——这通常是投影设计问题,需要检查分段键是否与 JOIN/GROUP BY 键匹配(详见 Vertica Join 重分段倾斜诊断与修复)。
SIP 过滤器的价值取决于 JOIN 的选择性。 如果 JOIN 是用于过滤的(如事实表 JOIN 一个经过 WHERE 过滤的小维度表),SIP 可以大幅减少 Scan 产出的行数。但如果 JOIN 只是用于反范式化(两个大表全量 JOIN),SIP 几乎不产生过滤效果,反而有评估开销。此时考虑对特定查询禁用 SIP:
4.2 加载维度¶
执行引擎的并行度和数据加载密切相关。 Tuple Mover 的 mergeout 操作也会生成执行计划——它本质上执行 INSERT ... SELECT ... 将小 ROS 容器合并为大容器。mergeout 在每个节点上独立运行(C-Store 7 Years §4),不需要跨节点 Exchange——但如果 mergeout 的输出需要写入多个投影(一张表有 N 个投影,mergeout 就要写 N 份),每个投影的排序键不同时就需要分别排序,这就是投影数量会影响加载性能的一个原因。
4.3 运维维度¶
资源池配置直接影响并行度。 EXECUTIONPARALLELISM 参数控制单个查询在每个节点上可以使用的最大线程数。如果设置为 1,Data 并行就被禁用了——多核 CPU 的优势被浪费。但如果设置过高(如等于物理核数),多个并发查询可能争抢 CPU,导致上下文切换开销超过并行收益。
执行引擎监控的关键系统表:
| 系统表 | 用途 | 关键列 |
|---|---|---|
v_monitor.execution_engine_profiles |
每个算子的运行时指标 | operator_name, path_id, counter_name, counter_value |
v_monitor.query_events |
执行期间的事件(正/负面) | event_type, event_category, operator_name, path_id |
v_internal.dc_query_executions |
查询各阶段耗时 | execution_step, completion_time - time AS duration |
v_monitor.resource_acquisitions |
资源获取记录 | request_type, memory_kb, threads, succeeded |
v_monitor.query_plan_profiles |
执行时的实际计划 | path_id, path_line, running_time, memory_allocated_bytes |
常见误解澄清:
- 「Projection 多了查询一定快」:不对。Projection 太多会导致优化器规划时间过长(可能触发
MEMORY_LIMIT_HIT事件),反而拖慢查询。 - 「Resegment 一定比 Broadcast 慢」:取决于表大小。对大表做 Broadcast 比 Resegment 更可怕——复制 10GB 到 64 个节点 = 630GB 网络传输。
- 「所有查询都应该最大化并行度」:并行的收益递减。线程数超过一定阈值后,内存带宽和锁竞争成为新瓶颈,加速比不再线性增长。
第 5 节:案例验证¶
📝 虚构案例:Broadcast 决策如何毁了查询性能¶
场景: 某电商平台数据仓库,3 节点集群,每节点 256GB 内存。orders 事实表 20 亿行,customers 维度表 2000 万行(按 HASH(customer_id) 分段)。查询目标是统计东部地区客户在 2025 年上半年的订单总额:
SELECT c.customer_segment, SUM(o.total_amount)
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.region = 'EAST'
AND c.signup_date >= '2024-01-01'
AND o.order_date BETWEEN '2025-01-01' AND '2025-06-30'
GROUP BY c.customer_segment;
orders 表按 HASH(order_id) 分段——与 JOIN 键 customer_id 不匹配。
设计选择: 优化器检查了 customers 表的统计信息。统计信息上次更新时表中只有 50 万行,此后又加载了 1950 万行但未重新收集统计信息。优化器仍然按 50 万行估算,决定对 customers 表做 Broadcast——总传输量估算为 50万 × 200字节/行 × (3-1) = 200MB,看似很合理。
实际后果: 2000 万行 customers 数据被广播到全部 3 个节点,总网络传输量实际为 2000万 × 200字节 × 2 = 8GB。与此同时,BROADCAST 操作是在查询执行的 JOIN 阶段发生的——它是一个阻塞操作,JOIN 必须等待 Broadcast 完成才能开始。查询总执行时间 420 秒,其中 Broadcast 占 180 秒。
回溯原理: 优化器的 Broadcast vs Resegment 决策完全依赖行数估算。统计信息的质量直接影响这个决策。在这个场景中,如果能准确估算 customers 的行数(2000 万),优化器会更倾向于对 orders 做 Resegment(按 HASH(customer_id) 重分布),因为 2000 万行的 Broadcast 成本已经接近甚至超过对 orders 做 Resegment 的成本。
修复: 执行 SELECT ANALYZE_STATISTICS('customers'); 更新统计信息。之后优化器自动将计划改为 orders Resegment + Local Join,查询时间降到 75 秒。
📋 真实案例 · 来源:Vertica Join 重分段倾斜诊断与修复¶
客户: 某运营商经分系统,6 节点 Vertica 集群(Enterprise 模式),每节点 512GB 内存,10GbE 网络。
故障现象: 每日 ETL 的核心 JOIN 查询执行时间从 45 分钟逐步增长到 3 小时以上。监控显示 6 个节点中只有 2 个节点 CPU 和网络负载高(接近 100%),其余 4 个节点处于低负载等待状态。
诊断过程:
- 在
QUERY_EVENTS系统表中发现RESEGMENTED_MANY_ROWS事件在该 JOIN 查询上高频出现 - 在
EXECUTION_ENGINE_PROFILES表中按节点聚合NetworkRecv算子的bytes received计数器,发现最高节点接收量是最低节点的 4.7 倍 - 使用
EXPLAIN检查执行计划,确认优化器在 JOIN 前插入了GLOBAL RESEGMENT操作,按HASH(customer_id)重分布事实表
根因分析: 事实表的分段键是 HASH(session_id),而 JOIN 键是 customer_id。由于业务特点,少数大客户(如集团客户)的 customer_id 关联了大量会话——约 25% 的数据行与仅 3% 的 customer_id 值关联。重分段后,这些「热点」值对应的数据汇聚到了同一节点,导致该节点的 NetworkRecv 远大于其他节点。
修复与效果:
- 短期修复:为该 JOIN 查询创建了一个按
HASH(customer_id)分段的新投影,消除了 Exchange 操作 - 查询时间:从 3 小时 12 分钟降至 17 分钟(约 11× 提升)
- 节点负载:从「2 节点过载 + 4 节点空转」变为 6 节点均匀负载
设计原理回溯: Exchange 算子的正确性不依赖数据均匀分布——无论数据怎么倾斜,结果都是正确的。但 性能依赖均匀分布。哈希分段在键值分布均匀时效果最好,在键值倾斜时会形成热点节点,而 JOIN 是全局屏障——最慢的节点决定了整个查询的时间(C-Store 7 Years §3.6)。
第 6 节:设计原则总结¶
原则 1:尽量让数据与计算在同一个节点上。¶
Exchange 是 MPP 执行引擎最昂贵的操作——每一行跨节点传输都要经历序列化→网络发送→反序列化的全路径。选择分段键时优先匹配最常见的 JOIN/GROUP BY 键,是为了让数据在加载时就落对位置,避免运行时搬运。反例: 事实表按主键分段但所有分析查询都按 customer_id 聚合——每一条查询都会触发全量重分段。
原则 2:Pipeline 并行和 Data 并行不是互斥的——最佳性能来自两者的组合。¶
单独启用 Data 并行(多线程扫描)但串行执行下游算子,Scan 线程会阻塞等待。单独启用 Pipeline 并行(流式算子链)但单线程执行,受到单核 CPU 瓶颈。MPP 执行引擎的线程调度器需要同时管理两种并行,任何一侧被压制都会导致硬件利用率塌方。
反例: 执行计划中插入了一个不必要的 Sort 算子(比如投影的排序键与 GROUP BY 键不匹配,优化器被迫加了一轮排序)。Sort 是一个阻塞算子——它必须收齐全部输入行、排完序,才能向下游输出第一批结果。这个 Sort 成了 Zone 边界(C-Store 7 Years §6.1):上游的 Scan + Filter 可以全力并行(Data 并行满配),下游的 GroupBy 也可以全力并行,但两段之间被 Sort 这个全量屏障隔开了——上游不全部跑完,下游连一行都拿不到。Pipeline 并行在 Zone 边界处被掐断。
原则 3:统计信息是优化器的眼睛。优化器「看不清」数据时,执行引擎就「跑不对」路。¶
Broadcast vs Resegment 决策、Join 顺序选择、内存预算分配——所有这些决定都依赖表行数和列分布估算。缺失或过期的统计信息是所有糟糕执行计划的共同根因。反例: 加载 5000 万行数据后不更新统计信息,优化器仍然以为表中只有 100 万行,将一张「看上去很小」的表 Broadcast 到 32 个节点。
原则 4:运行时自适应弥补了优化器的不完美,但不能替代良好的设计。¶
Join 算法切换、SIP 自适应、预聚合跳过——这些都是安全网,不是主策略。如果一个查询每次执行都触发算法切换,说明优化器对该查询的成本估算有系统性偏差,应该从统计信息或投影设计层面解决,而不是依赖运行时纠正。反例: Hash Join 每次都溢出到磁盘再切换为 Sort-Merge Join——每次切换意味着已经投入的哈希表构建时间和内存分配全部作废,查询从头再来。这说明优化器对该 JOIN 的输入大小估算有系统性偏差,应更新统计信息或调整投影设计,而不是指望运行时切换兜底。
原则 5:执行引擎的性能瓶颈在大多数情况下可以用「最慢路径」定位。¶
EXECUTION_ENGINE_PROFILES 按 path_id + operator_name + node_name 聚合后,耗时最长的路径几乎就是瓶颈所在。如果最慢路径在 NetworkSend 上——是 Exchange 问题。如果在 GroupByHash 上——是聚合内存不足。如果在 Scan 上——是 I/O 或谓词选择性差。反例: 不按 path_id 聚合,直接看全局 top operator——可能发现 Scan 耗时最长,但沒注意到有 8 条管线,每条 Scan 只占 12% 时间,真正瓶颈是某条管线的 Join。
原则 6:Exchange 不只在 JOIN 时出现——GROUP BY、DISTINCT、窗口函数都可能触发。¶
任何需要「相同键值在同一节点」的操作都可能插入 Exchange。理解这一点有助于避免对 Exchange 的「盲区」——只检查 JOIN 的分段匹配性,忽略了 GROUP BY 也可能触发重分段。反例: 事实表按 HASH(order_id) 分段,查询做 COUNT(DISTINCT customer_id)——即使没有 JOIN,也需要按 customer_id 重分段才能计算精确的去重聚合。
扩展阅读¶
按阅读顺序排列:
- C-Store 7 Years Later §6 — 本文理论依据的主要来源。§6.1 详细描述了执行引擎的算子、pull 模型、向量化执行、多线程调度。§6.2 描述了三代优化器的演化史。
- Vertica Query Optimizer §III — Exchange 算子(重分段/广播)作为优化器计划空间的一个维度被详细讨论。§IV-F 解释成本模型如何衡量 Broadcast vs Resegment 的网络代价。
- Materialization Strategies in Vertica — 介绍了 Sideways Information Passing (SIP) 的实现细节和自适应评估机制。SIP 是执行引擎最精妙的优化之一——它本质上把 JOIN 过滤逻辑推入了 Scan 算子。
- MPP JOIN 策略全解析与优化器决策逻辑 — 本文第 2 节中 Exchange 的决策逻辑在此有更完整的展开。
- MPP 数据分布策略 — 分段是 Exchange 的「反面」——数据加载时的分段设计越好,运行时的 Exchange 越少。
- Vertica性能调优-1 如何阅读执行计划 — 执行计划中每个算子的图形/文本表示的详细解读,包括颜色、形状、SIP 标注的含义。
- Vertica性能调优-2 使用系统表排查查询故障 —
EXECUTION_ENGINE_PROFILES的详细查询模板和结果分析方法。 - Vertica Join 重分段倾斜诊断与修复 — Exchange 导致数据倾斜的全套诊断方法——从
QUERY_EVENTS监控到EXECUTION_ENGINE_PROFILES按节点聚合定位热点。
本文以 Vertica 执行引擎为具体载体,但核心概念——算子 DAG、Exchange 边界、Pipeline 与 Data 并行的叠加——是所有 MPP 数据库执行引擎共享的骨架。理解了这些概念,阅读 Greenplum 的 Motion 算子、Doris 的 DataSink/DataStream 或 StarRocks 的 Exchange 节点就只是一个名字翻译的问题。真正的分水岭在于实现层的细粒度决策:Pull vs Push、向量化块大小、内存分配策略、自适应切换阈值——这些才是区分 MPP 系统执行效率的关键,也是本文试图掰开揉碎解释的部分。