📊 文档中的流程图索引
本文档包含以下 Mermaid 流程图(需要支持 Mermaid 的 Markdown 查看器):
- 架构层次图 - 展示从应用层到物理层的完整架构
- mDNS 发现流程图 - 设备发现的详细序列图
- 传输协议栈图 - libp2p 传输层分层结构
- 节点启动时序图 - 完整的启动流程
- 连接建立详细流程图 - mDNS、TCP、Noise、Yamux 的完整步骤
- 消息流向图 - 消息从发送到网络传输的路径
- Thunderbolt 依赖关系图 - Thunderbolt 与 mDNS 的分层依赖
- Thunderbolt 数据收集流程图 - InfoGatherer 触发到使用的完整流程
- Thunderbolt 关联流程图 - 两个数据源的关联逻辑
- Thunderbolt 完整使用流程图 - 从构造到 RDMA 通信的4个阶段
- Thunderbolt 设备发现时序图 - 完整的 Thunderbolt 发现和 RDMA 建立流程
- 选举流程图 - Bully 算法的详细执行流程
- 角色切换场景图 - Master 故障后的重新选举
- RDMA Queue Pair 状态机图 - RDMA 连接建立的状态转换过程
注意:如果流程图无法显示,请使用支持 Mermaid 的 Markdown 查看器,如:
- GitHub(在线查看)
- VS Code(安装 Markdown Preview Mermaid Support 插件)
- Typora
- Obsidian
概述
exo 使用基于 libp2p 的点对点网络架构,通过 mDNS (Multicast DNS) 实现本地网络内的自动设备发现,并支持 bootstrap peers 作为备选方案。在支持的 macOS 设备上,exo 还可以利用 RDMA over Thunderbolt 5 实现超低延迟(99% 延迟降低)的设备间通信。本文档详细描述了设备发现和连接建立的完整流程。
架构层次
graph TB
subgraph 应用层["Python 应用层"]
Master["Master"]
Worker["Worker"]
API["API"]
end
subgraph 路由层["Python 路由层"]
TopicRouter["TopicRouter (主题路由)
GLOBAL_EVENTS | LOCAL_EVENTS
COMMANDS | ELECTION"]
Router["Router (消息路由器)"]
end
subgraph 绑定层["PyO3 绑定层 (Rust ↔ Python)"]
PyNetworking["PyNetworkingHandle / PyFromSwarm"]
end
subgraph 网络层["Rust 网络层"]
Swarm["Swarm (网络群集)"]
Discovery["Discovery Behaviour"]
Gossipsub["Gossipsub Behaviour"]
Transport["TCP/IP + Noise + Yamux 传输层"]
end
subgraph 信息收集["系统信息收集 (macOS)"]
SysProfiler["system_profiler
Thunderbolt 数据"]
RdmaCtl["rdma_ctl status
RDMA 状态"]
NetworkSetup["networksetup
网络配置"]
IfConfig["ifconfig
接口状态"]
end
subgraph 物理层["物理网络层"]
Ethernet["Ethernet / WiFi"]
Thunderbolt["Thunderbolt 5
(支持 RDMA)"]
end
Master & Worker & API --> TopicRouter
TopicRouter --> Router
Router --> PyNetworking
PyNetworking --> Swarm
Swarm --> Discovery
Swarm --> Gossipsub
Swarm --> Transport
Transport --> Ethernet
Transport --> Thunderbolt
SysProfiler --> Discovery
RdmaCtl --> Discovery
NetworkSetup --> Discovery
IfConfig --> Discovery
核心组件概述
exo 的 Node 运行以下核心组件:
| 组件 | 是否所有节点运行 | 说明 |
|---|---|---|
| Router | ✅ 是 | libp2p 网络层,所有节点都有 |
| Election | ✅ 是 | 参与主控器选举(Bully 算法) |
| Worker | ✅ 是* | 处理推理任务(除非 --no-worker) |
| Master | ❌ 否 | 只有一个节点通过选举成为 master |
| API | ✅ 是 | 提供 API 和 dashboard(所有节点) |
- 可以通过
--no-worker禁用
第一部分:设备发现与基础连接
1.1 mDNS 设备发现
mDNS 配置参数
1 | |
发现流程
sequenceDiagram
participant A as 节点 A
participant B as 节点 B
Note over A: 启动 mDNS 服务
监听 5353/UDP
A->>B: 定期广播 mDNS 查询
谁在运行 exo?
Note over B: 启动 mDNS 服务
监听 5353/UDP
B-->>A: 响应 mDNS
我在运行 exo
PeerID: 12D3KooW...
地址: /ip4/192.168.1.100/tcp/8000
Note over A: 记录发现的节点
mdns_discovered[PeerID] = {addr}
A->>B: 尝试连接
Dial(12D3KooW..., addr)
Note over B: 接受连接
握手 (Noise 协议)
Note over A: 连接建立成功
发送 ConnectionEstablished 事件
A->>B: 订阅 Gossipsub 主题
- global_events
- local_events
- commands
- election_messages
Note over B: 订阅主题并开始消息交换
连接状态跟踪
1 | |
连接维护机制
1 | |
1.2 传输层配置
传输协议栈
graph TB
subgraph 传输栈["libp2p 传输协议栈"]
TCP["TCP/IP (nodelay)
减少延迟"]
PNet["Private Network (PNet)
预共享密钥隔离网络"]
Upgrade["Upgrade Version 1 (Lazy)
V1 + lazy flushing => 0-RTT"]
Noise["Noise 协议认证
比 TLS 快,安全性足够"]
Yamux["Yamux 多路复用
在单个连接上多路复用"]
end
TCP --> PNet
PNet --> Upgrade
Upgrade --> Noise
Noise --> Yamux
私有网络配置
1 | |
1.3 连接建立详细流程
sequenceDiagram
participant A as 节点 A
participant M as mDNS 服务
participant B as 节点 B
participant N as libp2p 网络
A->>M: 启动 mDNS,监听 5353/UDP
B->>M: 启动 mDNS,监听 5353/UDP
A->>M: mDNS 查询: "_libp2p._udp.local"
M->>B: 转发查询
B->>M: mDNS 响应: PTR + TXT + A 记录
M->>A: 转发响应
Note over A: 解析 mDNS 响应
A->>A: mdns_discovered[peer_B] = {addr_B}
A->>N: Dial(peer_B, "/ip4/192.168.1.100/tcp/8000")
N->>B: TCP 连接请求
B->>N: 接受连接
A->>B: Noise 握手 (NK 优先生成模式)
B->>A: Noise 握手响应
Note over A,B: 加密通道建立
A->>B: Yamux 会话初始化
B->>A: Yamux 确认
Note over A,B: 多路复用就绪
A->>A: 触发 ConnectionEstablished 事件
A->>B: Gossipsub 订阅: global_events
A->>B: Gossipsub 订阅: local_events
A->>B: Gossipsub 订阅: commands
A->>B: Gossipsub 订阅: election_messages
B->>A: Gossipsub 订阅确认
Note over A,B: P2P 连接完全建立
第二部分:集群架构与选举机制
2.1 选举机制概述
exo 使用 Bully 算法实现主控器选举,确保集群中始终只有一个 Master,并支持 Master 角色的动态切换。
节点角色
| 组件 | 是否所有节点运行 | 说明 |
|---|---|---|
| Router | ✅ 是 | libp2p 网络层 |
| Election | ✅ 是 | 参与主控器选举 |
| Worker | ✅ 是* | 处理推理任务 |
| Master | ❌ 否 | 只有一个节点通过选举成为 master |
| API | ✅ 是 | 提供 HTTP API |
- 可以通过
--no-worker禁用
Bully 算法核心思想
1 | |
选举消息比较
1 | |
比较优先级:
- clock(选举轮次):越新越优先
- seniority(资深度):当选次数越多越优先
- commands_seen(处理命令数):处理越多越优先
- node_id(节点 ID):ID 越大越优先
2.2 选举触发时机
1. 节点启动时
1 | |
2. 新节点连接时
1 | |
3. Master 宕机时
1 | |
2.3 角色动态切换机制
_elect_loop 处理选举结果
1 | |
关键切换场景
场景:Master 宕机,重新选举
sequenceDiagram
participant A as 节点 A (原 Master)
participant B as 节点 B
participant C as 节点 C
Note over A: 当前 Master: A
A.master 运行中
Note over A: 宕机!
A->>A: 进程崩溃
Note over B: Ping 超时
Note over C: Ping 超时
B->>B: clock += 1
触发新选举
C->>C: clock += 1
触发新选举
B->>C: ElectionMessage(clock=1, ...)
C->>B: ElectionMessage(clock=1, ...)
Note over B: elected = max([B, C])
Note over C: elected = max([B, C])
alt B 的 NodeId > C 的 NodeId
Note over B,C: B 当选新 Master
B->>B: 创建 Master 组件
Note over B: 🎉 B 成为新 Master
C->>C: 继续作为 Worker
else C 的 NodeId > B 的 NodeId
Note over B,C: C 当选新 Master
C->>C: 创建 Master 组件
Note over C: 🎉 C 成为新 Master
B->>B: 继续作为 Worker
end
第三部分:Thunderbolt/RDMA 深度解析
3.1 Thunderbolt 与 mDNS 的依赖关系
重要概念:Thunderbolt/RDMA 依赖 mDNS(或其他节点发现机制)才能工作。
依赖链分析
1 | |
关键证据
关键代码(src/exo/shared/apply.py:335-354):
1 | |
场景分析
场景:两台 Mac 通过 Thunderbolt 5 线缆连接,但 mDNS 失败
1 | |
依赖关系总结
| 场景 | mDNS | libp2p | Gossipsub | Thunderbolt | 结果 |
|---|---|---|---|---|---|
| 场景 1 | ✅ | ✅ | ✅ | ✅ | ✅ RDMA 连接成功 |
| 场景 2 | ✅ | ✅ | ✅ | ❌ | ⚠️ Socket 连接(降级) |
| 场景 3 | ❌ | ❌ | ❌ | ✅ | ❌ 无法建立连接 |
| 场景 4 | ❌ | ✅* | ✅ | ✅ | ✅ RDMA 连接成功 |
*场景 4:使用 bootstrap peers 替代 mDNS
3.2 Thunderbolt 数据收集机制
InfoGatherer 触发流程
InfoGatherer 的 Thunderbolt 监控任务在节点启动时自动触发,无需用户干预。
sequenceDiagram
participant User as 用户
participant Main as exo main.py
participant Node as Node
participant Worker as Worker
participant IG as InfoGatherer
participant Monitor as _monitor_system_profiler_thunderbolt_data
participant SP as system_profiler
User->>Main: uv run exo
Main->>Main: 解析命令行参数
Note over Main: 创建组件实例
Main->>Node: Node.create(args)
Node->>Node: 创建 Router
Node->>Node: 创建 EventRouter
Node->>Node: 创建 Election
alt 未指定 --no-worker
Node->>Worker: Worker(node_id, ...)
Note over Worker: Worker 实例创建
但尚未启动
Node->>Node: worker = Worker 实例
else 指定了 --no-worker
Node->>Node: worker = None
end
Node->>Node: 创建 Master
Node->>Node: 创建 API
Note over Node: 所有组件实例创建完成
Main->>Node: node.run()
Note over Node: 开始启动各个组件
Node->>Node: tg.start_soon(router.run)
Node->>Node: tg.start_soon(event_router.run)
Node->>Node: tg.start_soon(election.run)
alt worker 存在
Node->>Worker: tg.start_soon(worker.run)
Note over Worker: Worker.run() 启动
Worker->>Worker: logger.info("Starting Worker")
Worker->>Worker: info_send, info_recv = channel()
Worker->>IG: InfoGatherer(info_send)
Note over IG: InfoGatherer 实例创建
Worker->>IG: tg.start_soon(info_gatherer.run)
Note over IG: InfoGatherer.run() 启动
IG->>IG: async with self._tg as tg
IG->>IG: 检查 IS_DARWIN
alt macOS 系统
Note over IG: 启动 macOS 特定的监控任务
IG->>IG: tg.start_soon(_monitor_macmon, 1)
IG->>Monitor: tg.start_soon(
_monitor_system_profiler_thunderbolt_data, 5
)
IG->>IG: tg.start_soon(_monitor_thunderbolt_bridge_status, 10)
IG->>IG: tg.start_soon(_monitor_rdma_ctl_status, 10)
else Linux 系统
Note over IG: 启动 Linux 特定的监控任务
IG->>IG: tg.start_soon(_monitor_memory_usage, 1)
end
IG->>IG: tg.start_soon(_watch_system_info, 10)
IG->>IG: tg.start_soon(_monitor_misc, 60)
IG->>IG: tg.start_soon(_monitor_static_info, 60)
IG->>IG: tg.start_soon(_monitor_disk_usage, 30)
Note over Monitor: Thunderbolt 监控任务启动
Monitor->>Monitor: while True:
Monitor->>SP: system_profiler SPThunderboltDataType
SP-->>Monitor: 返回 JSON 数据
Monitor->>IG: info_sender.send(
MacThunderboltIdentifiers
)
Monitor->>IG: info_sender.send(
MacThunderboltConnections
)
Note over Monitor: 等待 5 秒
Monitor->>Monitor: anyio.sleep(5)
Note over Monitor: 循环继续...
end
Node->>Node: tg.start_soon(master.run)
Node->>Node: tg.start_soon(api.run)
Note over Node: 所有组件运行中
数据收集流程
触发时机:InfoGatherer 启动时创建后台任务,每 5 秒执行一次
收集步骤:
1 | |
3.3 两个数据源的关联
为什么要关联两个数据源?
核心问题:创建完整的 RDMA 接口信息需要同时使用两个数据源。
两个数据源各提供一部分信息
数据源 1:system_profiler(Thunderbolt 硬件信息)
1 | |
它告诉我们:
- ✅ Thunderbolt 设备的唯一 ID (
domain_uuid) - ✅ 它在端口 1 上 (
receptacle_id_key) - ✅ 当前速度是 80 Gb/s (
current_speed_key) - ❌ 但不知道对应的网络接口是
en2、en3还是其他
数据源 2:networksetup(端口名称到接口的映射)
1 | |
它告诉我们:
- ✅ “Thunderbolt 1” 对应设备
en2 - ✅ “Thunderbolt 2” 对应设备
en3 - ❌ 但不知道 Thunderbolt 硬件信息(UUID、速度等)
关联流程图
flowchart TD
subgraph SP_Data["system_profiler 数据
Thunderbolt 硬件信息"]
SP1["receptacle_id_key = 1"]
SP2["domain_uuid_key = AABBCCDD-..."]
SP3["current_speed_key = 80 Gb/s"]
end
subgraph NS_Data["networksetup 映射数据
端口名称到设备名"]
NS1["Thunderbolt 1 → en2"]
NS2["Thunderbolt 2 → en3"]
NS3["Thunderbolt 3 → en4"]
end
subgraph Logic["关联逻辑 ident 方法"]
L1["步骤 1: 构造端口名称
Thunderbolt + receptacle_id_key
= Thunderbolt 1"]
L2["步骤 2: 查找设备名
ifaces Thunderbolt 1
= en2"]
L3["步骤 3: 构造 RDMA 接口名
rdma_ + en2
= rdma_en2"]
end
subgraph Result["最终结果
ThunderboltIdentifier"]
R1["rdma_interface = rdma_en2
来自 networksetup"]
R2["domain_uuid = AABBCCDD-...
来自 system_profiler"]
R3["link_speed = 80 Gb/s
来自 system_profiler"]
end
SP_Data --> Logic
NS_Data --> Logic
Logic --> Result
style SP_Data fill:#e1f5ff
style NS_Data fill:#fff3e0
style Logic fill:#f3e5f5
style Result fill:#e8f5e9
关联代码实现
_gather_iface_map() 函数:
1 | |
ident() 方法完成关联:
1 | |
3.4 Thunderbolt 数据的完整使用流程
4 个阶段的数据流转
flowchart TD
subgraph Phase1["阶段 1: 数据收集与存储"]
P1_1["InfoGatherer 收集 Thunderbolt 数据
system_profiler + networksetup"]
P1_2["构造 ThunderboltIdentifier
rdma_interface, domain_uuid, link_speed"]
P1_3["发送 MacThunderboltIdentifiers 事件
via Gossipsub"]
P1_4["Master 存储到 state.node_thunderbolt
domain_uuid → node_id 映射建立"]
end
subgraph Phase2["阶段 2: 拓扑构建"]
P2_1["接收 MacThunderboltConnections 事件
source_uuid ↔ sink_uuid"]
P2_2["查询 state.node_thunderbolt 映射
domain_uuid → node_id, rdma_interface"]
P2_3["创建 RDMAConnection 对象
source_rdma_iface, sink_rdma_iface"]
P2_4["更新 State.topology
添加 RDMA 连接边"]
end
subgraph Phase3["阶段 3: 模型放置"]
P3_1["接收模型推理请求
PlaceInstance 命令"]
P3_2["选择合适的节点环
topology.get_cycles"]
P3_3["检查是否为 RDMA 环
topology.is_rdma_cycle"]
P3_4["构建 RDMA 设备矩阵
get_mlx_jaccl_devices_matrix"]
P3_5["创建 MlxJacclInstance
jaccl_devices = matrix"]
end
subgraph Phase4["阶段 4: 分布式推理"]
P4_1["Master 发送 CreateInstance 命令
到各个 Worker"]
P4_2["Worker 接收实例配置
包含 jaccl_devices 矩阵"]
P4_3["Worker 启动 MLX JACCL 引擎
使用 RDMA 接口进行通信"]
P4_4["跨设备张量并行
通过 rdma_en2, rdma_en3 等"]
end
Phase1 --> Phase2
Phase2 --> Phase3
Phase3 --> Phase4
style Phase1 fill:#e3f2fd
style Phase2 fill:#fff3e0
style Phase3 fill:#f3e5f5
style Phase4 fill:#e8f5e9
详细步骤说明
阶段 1:数据收集与存储
1 | |
阶段 2:拓扑构建
1 | |
阶段 3:模型放置
1 | |
阶段 4:分布式推理
1 | |
RDMA 连接建立详解
拓扑构建完成后,exo 为 MLX distributed 提供配置,MLX distributed 框架负责建立实际的 RDMA 连接:
职责划分:
- exo:构建 RDMA 拓扑,提供
jaccl_devices配置矩阵 - MLX distributed:读取配置,通过 libibverbs 建立 RDMA 连接
步骤 1:MLX Distributed 初始化
1 | |
步骤 2:MLX 如何使用 RDMA 接口
重要说明:exo 不直接建立 RDMA 连接。exo 只负责提供配置信息,实际的 RDMA 连接建立由 MLX distributed 框架内部处理。
调用链如下:
1
exo → mx.distributed.init(backend="jaccl") → MLX distributed 内部 → libibverbs → RDMA 硬件
exo 的职责(止步于此):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18# src/exo/worker/engines/mlx/utils_mlx.py:128-149
# exo 只负责配置,不直接操作 RDMA
# 1. 序列化设备矩阵
jaccl_devices_json = json.dumps(jaccl_devices)
# 2. 写入配置文件
with open(coordination_file, "w") as f:
f.write(jaccl_devices_json)
# 3. 设置环境变量(告诉 MLX 使用哪些 RDMA 接口)
os.environ["MLX_IBV_DEVICES"] = coordination_file
os.environ["MLX_RANK"] = str(rank)
os.environ["MLX_JACCL_COORDINATOR"] = jaccl_coordinator
# 4. 调用 MLX distributed 初始化
# 从这里开始,RDMA 连接建立完全由 MLX distributed 接管
group = mx.distributed.init(backend="jaccl", strict=True)
MLX distributed 的职责(exo 不参与):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19# 这是 MLX distributed 框架内部的实现(exo 不可见)
# 以下代码由 MLX distributed 在 mx.distributed.init() 内部执行
# 1. 读取 MLX_IBV_DEVICES 文件
with open(os.environ["MLX_IBV_DEVICES"]) as f:
jaccl_devices = json.load(f)
# 2. 解析当前节点的 RDMA 接口
rank = int(os.environ["MLX_RANK"])
my_interfaces = jaccl_devices[rank] # [None, "rdma_en2", "rdma_en3"]
# 3. 对每个其他节点建立 RDMA 连接
for dst_rank, iface_name in enumerate(my_interfaces):
if iface_name is not None:
# MLX distributed 通过 libibverbs 建立 RDMA 连接
# exo 不参与此过程
# libibverbs 会调用 InfiniBand Verbs API
# 最终通过 Thunderbolt 网络直接访问远程内存
pass # MLX 内部实现
步骤 3:推理时的数据传输
1 | |
完整 RDMA 通信流程
sequenceDiagram
participant W1 as Worker 1
(rank=0)
participant W2 as Worker 2
(rank=1)
participant W3 as Worker 3
(rank=2)
participant MLX as MLX Distributed
participant RDMA as RDMA 网络层
Note over W1,W2: Master 创建 MlxJacclInstance
包含 jaccl_devices 矩阵
Note over W1: 步骤 1: 初始化 MLX Distributed
W1->>W1: 设置 MLX_IBV_DEVICES 文件
[[null, "rdma_en2", "rdma_en3"], ...]
W2->>W2: 设置 MLX_IBV_DEVICES 文件
[[rdma_en2", null, "rdma_en2"], ...]
W3->>W3: 设置 MLX_IBV_DEVICES 文件
[[rdma_en3", "rdma_en2", null], ...]
W1->>MLX: mx.distributed.init(backend="jaccl")
W2->>MLX: mx.distributed.init(backend="jaccl")
W3->>MLX: mx.distributed.init(backend="jaccl")
Note over W1,W2: 步骤 2: 建立 RDMA 连接
W1->>RDMA: 通过 rdma_en2 连接到 W2
W1->>RDMA: 通过 rdma_en3 连接到 W3
W2->>RDMA: 通过 rdma_en2 连接到 W1
W2->>RDMA: 通过 rdma_en2 连接到 W3
W3->>RDMA: 通过 rdma_en2 连接到 W1
W3->>RDMA: 通过 rdma_en2 连接到 W2
Note over RDMA: 使用 libibverbs (InfiniBand Verbs)
绕过 TCP/IP 协议栈
直接内存访问
RDMA-->>W1: 连接建立
RDMA-->>W2: 连接建立
RDMA-->>W3: 连接建立
Note over W1,W2: 步骤 3: 开始推理
W1->>W1: 加载模型 shard (rank 0)
W2->>W2: 加载模型 shard (rank 1)
W3->>W3: 加载模型 shard (rank 2)
Note over W1,W2: 步骤 4: 前向传播
W1->>W1: 计算本地输出
W1->>RDMA: 发送到 W2 (rdma_en2)
直接内存访问
W1->>RDMA: 发送到 W3 (rdma_en3)
直接内存访问
RDMA->>W2: 接收数据到内存
RDMA->>W3: 接收数据到内存
W2->>W2: 计算本地输出
W2->>RDMA: 发送到 W1 和 W3
W3->>W3: 计算本地输出
W3->>RDMA: 发送到 W1 和 W2
Note over W1,W2: 步骤 5: 反向传播
W1->>RDMA: 发送梯度到 W2 和 W3
W2->>RDMA: 发送梯度到 W1 和 W3
W3->>RDMA: 发送梯度到 W1 和 W2
Note over W1,W2: 梯度同步完成
推理完成!
技术底层:MLX Distributed JACCL 实现
MLX JACCL 是如何建立 RDMA 连接的
MLX distributed 框架通过 JACCL(Collective Communications Library)后端实现 RDMA 通信。以下是实际的代码实现路径:
1. 动态加载 RDMA 库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// /home/wanger/codes/mlx/mlx/distributed/jaccl/utils.cpp:36-67
IBVWrapper::IBVWrapper() {
// macOS 上动态加载 librdma.dylib
librdma_handle_ = dlopen("librdma.dylib", RTLD_NOW | RTLD_GLOBAL);
// 加载 libibverbs 符号
LOAD_SYMBOL(ibv_get_device_list, get_device_list);
LOAD_SYMBOL(ibv_open_device, open_device);
LOAD_SYMBOL(ibv_alloc_pd, alloc_pd);
LOAD_SYMBOL(ibv_create_cq, create_cq);
LOAD_SYMBOL(ibv_create_qp, create_qp);
LOAD_SYMBOL(ibv_modify_qp, modify_qp);
LOAD_SYMBOL(ibv_reg_mr, reg_mr);
// ... 更多符号
}
2. Connection 类:管理单个 RDMA 连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// /home/wanger/codes/mlx/mlx/distributed/jaccl/utils.cpp:110-255
class Connection {
ibv_context* ctx; // RDMA 设备上下文
ibv_pd* protection_domain; // 保护域
ibv_cq* completion_queue; // 完成队列
ibv_qp* queue_pair; // 队列对
Destination src; // 本地连接信息
// 分配保护域
void allocate_protection_domain();
// 创建完成队列
void create_completion_queue(int num_entries);
// 创建队列对(Queue Pair)
void create_queue_pair();
// 将 QP 转换为 INIT 状态
void queue_pair_init();
// 将 QP 转换为 RTR(Ready to Receive)状态
void queue_pair_rtr(const Destination& dst);
// 将 QP 转换为 RTS(Ready to Send)状态
void queue_pair_rts();
};
3. RingGroup 类:环形拓扑的分布式组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// /home/wanger/codes/mlx/mlx/distributed/jaccl/ring.cpp:10-37
RingGroup::RingGroup(
int rank,
int size,
const std::vector<std::string>& left_devices,
const std::vector<std::string>& right_devices,
const char* coordinator_addr)
: rank_(rank),
size_(size),
n_conns_(left_devices.size()),
side_channel_(rank_, size_, coordinator_addr), // TCP 侧信道
left_(create_connections(left_devices)), // 左侧连接
right_(create_connections(right_devices)) { // 右侧连接
// 初始化所有连接
initialize();
// 同步屏障:确保所有节点都已初始化
side_channel_.all_gather<int>(0);
// 创建环形实现
ring_ = RingImpl(rank_, size_, left_, right_, send_buffers_, recv_buffers_);
}
4. 初始化流程:从环境变量到 RDMA 连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// /home/wanger/codes/mlx/mlx/distributed/jaccl/jaccl.cpp:130-176
std::shared_ptr<GroupImpl> init(bool strict) {
// 1. 读取环境变量
const char* dev_file = std::getenv("MLX_IBV_DEVICES"); // ← exo 设置
const char* coordinator = std::getenv("MLX_JACCL_COORDINATOR"); // ← exo 设置
const char* rank_str = std::getenv("MLX_RANK"); // ← exo 设置
// 2. 解析设备矩阵 JSON 文件
DeviceFile devices(dev_file);
// JSON 格式:
// [
// [null, "rdma_en2", "rdma_en3"], // rank 0 的连接
// ["rdma_en3", null, "rdma_en2"], // rank 1 的连接
// ["rdma_en2", "rdma_en3", null] // rank 2 的连接
// ]
// 3. 提取当前节点的连接信息
auto [left, right] = devices.extract_ring_connectivity(rank);
// left = ["rdma_en3"] // 连接到左侧节点的设备
// right = ["rdma_en2"] // 连接到右侧节点的设备
// 4. 创建 RingGroup(触发 RDMA 连接建立)
return std::make_shared<RingGroup>(
rank, devices.size(), left, right, coordinator);
}
5. 完整的 RDMA 连接建立流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// /home/wanger/codes/mlx/mlx/distributed/jaccl/ring.cpp:39-90
void RingGroup::initialize() {
// 步骤 1: 创建队列对
for (auto& conn : left_) {
conn.allocate_protection_domain(); // ibv_alloc_pd
conn.create_completion_queue(...); // ibv_create_cq
conn.create_queue_pair(); // ibv_create_qp
}
// 对右侧连接执行相同操作...
// 步骤 2: 分配并注册内存缓冲区
allocate_buffers();
// 内部调用 ibv_reg_mr 注册内存到保护域
// 步骤 3: 初始化队列对状态为 INIT
for (auto& conn : left_) {
conn.queue_pair_init(); // ibv_modify_qp → IBV_QPS_INIT
}
// 对右侧连接执行相同操作...
// 步骤 4: 通过 TCP 侧信道交换连接信息
std::vector<Destination> left_info;
for (auto& conn : left_) {
left_info.emplace_back(conn.info());
}
// Destination 包含:
// - local_id: 本地 LID(Local Identifier)
// - queue_pair_number: QP 编号
// - global_identifier: GID(Global Identifier)
// - packet_sequence_number: PSN
// 通过 TCP all-gather 交换所有节点的连接信息
auto all_left_infos = side_channel_.all_gather(left_info);
auto all_right_infos = side_channel_.all_gather(right_info);
// 步骤 5: 转换队列对状态为 RTR 和 RTS
int left_peer = (rank_ + size_ - 1) % size_;
for (int i = 0; i < left_.size(); i++) {
auto peer_info = all_right_infos[left_peer][i];
left_[i].queue_pair_rtr(peer_info); // ibv_modify_qp → IBV_QPS_RTR
left_[i].queue_pair_rts(); // ibv_modify_qp → IBV_QPS_RTS
}
// 对右侧连接执行相同操作...
// 现在 RDMA 连接已建立,可以进行零拷贝数据传输!
}
6. SideChannel:TCP 侧信道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// /home/wanger/codes/mlx/mlx/distributed/jaccl/utils.cpp:288-321
class SideChannel {
// 使用 TCP 连接交换 RDMA 连接元数据
// 为什么需要 TCP?
// - RDMA 连接需要远程节点的 QPN、LID、GID 信息
// - 但这些信息只有连接建立后才能获得
// - 因此使用 TCP 作为控制通道来交换这些元数据
SideChannel(int rank, int size, const char* addr)
: rank_(rank), size_(size) {
if (rank_ == 0) {
// Rank 0 作为服务器,接受所有其他节点的连接
detail::TCPSocket server;
server.listen(IBV_TAG, address);
for (int i = 0; i < size - 1; i++) {
sockets_.push_back(server.accept(IBV_TAG));
}
} else {
// 其他节点作为客户端,连接到 rank 0
sockets_.push_back(
detail::TCPSocket::connect(IBV_TAG, address, 4, 1000));
}
}
// All-gather 实现
template <typename T>
std::vector<T> all_gather(const T& v) {
// Rank 0 收集所有数据并广播
// 其他节点发送数据并接收广播结果
}
};
7. 数据发送和接收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// /home/wanger/codes/mlx/mlx/distributed/jaccl/utils.h:178-218
struct Connection {
// 发送数据(RDMA SEND 操作)
void post_send(const SharedBuffer& buff, uint64_t work_request_id) {
ibv_send_wr work_request;
work_request.wr_id = work_request_id;
work_request.sg_list = &entry; // Scatter-Gather 列表
work_request.num_sge = 1;
work_request.opcode = IBV_WR_SEND; // RDMA SEND 操作
work_request.send_flags = IBV_SEND_SIGNALED;
ibv_post_send(queue_pair, &work_request, &bad_work_request);
}
// 接收数据(RDMA RECV 操作)
void post_recv(const SharedBuffer& buff, uint64_t work_request_id) {
ibv_recv_wr work_request;
work_request.wr_id = work_request_id;
work_request.sg_list = &entry;
work_request.num_sge = 1;
ibv_post_recv(queue_pair, &work_request, &bad_work_request);
}
// 轮询完成队列
int poll(int num_completions, ibv_wc* work_completions) {
return ibv_poll_cq(completion_queue, num_completions, work_completions);
}
};
8. 实际推理时的数据流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// /home/wanger/codes/mlx/mlx/distributed/jaccl/ring.cpp:150-160
void RingGroup::all_gather(const array& input, array& output, Stream stream) {
auto in_ptr = input.data<char>();
auto out_ptr = output.data<char>();
int64_t n_bytes = input.nbytes();
// 调度到 CPU 流
auto& encoder = cpu::get_command_encoder(stream);
encoder.set_input_array(input);
encoder.set_output_array(output);
// 执行 RDMA 通信
encoder.dispatch([in_ptr, out_ptr, n_bytes, this]() {
ring_.all_gather(in_ptr, out_ptr, n_bytes, n_conns_);
// 内部使用 ibv_post_send/ibv_post_recv 进行零拷贝传输
});
}
关键技术点总结:
| 技术 | 作用 | 代码位置 |
|---|---|---|
| librdma.dylib | macOS 上的 RDMA 库 | dlopen("librdma.dylib") |
| libibverbs | InfiniBand Verbs API | 动态符号绑定 |
| 保护域 (PD) | 内存管理区域 | ibv_alloc_pd() |
| 完成队列 (CQ) | 异步操作完成通知 | ibv_create_cq() |
| 队列对 (QP) | 发送/接收工作队列 | ibv_create_qp() |
| 内存注册 | 注册内存用于 RDMA | ibv_reg_mr() |
| QP 状态机 | INIT → RTR → RTS | ibv_modify_qp() |
| TCP 侧信道 | 交换 RDMA 元数据 | SideChannel 类 |
exo 与 MLX distributed 的完整调用链:
1 | |
环境变量与配置文件详解
RDMA Queue Pair 状态机
理解 RDMA 连接建立的关键是 Queue Pair (QP) 的状态转换:
stateDiagram-v2
[*] --> Reset: ibv_create_qp()
Reset --> Init: ibv_modify_qp(IBV_QPS_INIT)
Init --> RTR: ibv_modify_qp(IBV_QPS_RTR)
使用远程节点的 QPN、LID、GID
RTR --> RTS: ibv_modify_qp(IBV_QPS_RTS)
RTS --> [*]: 可以发送/接收数据
note right of Reset
队列对刚创建
还不能使用
end note
note right of Init
本地初始化完成
设置端口和访问权限
准备接收连接信息
end note
note right of RTR
Ready to Receive
已配置远程节点信息
可以接收数据
end note
note right of RTS
Ready to Send
可以发送和接收数据
RDMA 连接完全建立
end note
状态转换详解:
1 | |
为什么需要 TCP 侧信道:
1 | |
MLX JACCL 需要的环境变量(exo 自动设置)
| 环境变量 | 值示例 | 说明 |
|---|---|---|
MLX_IBV_DEVICES |
/tmp/jaccl_devices_abc123.json |
RDMA 设备矩阵 JSON 文件路径 |
MLX_RANK |
0, 1, 2 |
当前节点的 rank(0-based) |
MLX_JACCL_COORDINATOR |
192.168.1.100:12345 |
Rank 0 的 IP:port(用于 TCP 侧信道) |
设备矩阵 JSON 文件格式:
1 | |
矩阵说明:
devices[i][j]= 节点 i 连接到节点 j 的 RDMA 设备名列表devices[i][i]=null(节点不连接自己)- 设备名列表可包含多个设备(多路径)
- 单路径:
"rdma_en2" - 多路径:
["rdma_en2", "rdma_en3"] - 无连接:
null
- 单路径:
exo 如何生成设备矩阵:
1 | |
TCP 侧信道的作用:
1 | |
Coordinator IP 和 Port 的选择:
1 | |
性能对比:
| 操作 | TCP/IP | RDMA |
|---|---|---|
| 数据拷贝 | 2-3 次 | 0 次 |
| 内核参与 | 需要 | 不需要 |
| 延迟 | 基线 | 99% 降低 |
完整数据流示例
场景:3 个 Mac Studio 通过 Thunderbolt 5 线缆连接
sequenceDiagram
participant A as Mac A
participant B as Mac B
participant C as Mac C
participant M as Master
participant T as Topology
Note over A,B: 阶段 1: 收集 Thunderbolt 信息
A->>A: 构造 ThunderboltIdentifier
rdma_interface="rdma_en2"
B->>B: 构造 ThunderboltIdentifier
rdma_interface="rdma_en2"
C->>C: 构造 ThunderboltIdentifier
rdma_interface="rdma_en2"
A->>M: 发送 MacThunderboltIdentifiers
B->>M: 发送 MacThunderboltIdentifiers
C->>M: 发送 MacThunderboltIdentifiers
Note over A,B: 阶段 2: 检测物理连接
A->>M: 发送 MacThunderboltConnections
B->>M: 发送 MacThunderboltConnections
C->>M: 发送 MacThunderboltConnections
Note over M: 阶段 3: 构建 RDMA 拓扑
M->>M: 构建 domain_uuid → node_id 映射
M->>M: 创建 RDMAConnection 对象
M->>T: 更新 topology
Note over M: 阶段 4: 模型放置
M->>M: 接收 PlaceInstance 命令
M->>T: 查询 RDMA 环
M->>M: 构建 RDMA 设备矩阵
Note over A,B: 阶段 5: 启动推理
M->>A: CreateInstance(jaccl_devices)
M->>B: CreateInstance(jaccl_devices)
M->>C: CreateInstance(jaccl_devices)
A->>A: 启动 MLX JACCL 引擎
B->>B: 启动 MLX JACCL 引擎
C->>C: 启动 MLX JACCL 引擎
Note over A,B: 阶段 6: 分布式推理
A->>B: 通过 rdma_en2 发送张量数据
B->>C: 通过 rdma_en2 发送张量数据
C->>A: 通过 rdma_en2 发送张量数据
Note over A,B: 推理完成!
3.5 网络配置脚本
tmp/set_rdma_network_config.sh 脚本用于配置 Thunderbolt RDMA 网络。
问题背景
macOS 默认将所有 Thunderbolt 接口聚合到一个 “Thunderbolt Bridge” 网桥中,导致:
- 🌪️ 网络风暴:所有接口共享同一个 MAC 地址
- 🔀 无法独立控制:无法为每个 RDMA 接口单独配置
- 📉 性能下降:RDMA 无法正常工作
解决方案
1 | |
第四部分:消息路由与事件传播
4.1 事件溯源架构
核心概念:所有状态变更是不可变事件。
| 事件类型 | 说明 |
|---|---|
| NodeGatheredInfo | 节点信息收集(包括 Thunderbolt 信息) |
| InstanceCreated | 实例创建 |
| InstanceDeleted | 实例删除 |
| TaskCreated | 任务创建 |
| TopologyEdgeCreated | 拓扑边创建 |
| TopologyEdgeDeleted | 拓扑边删除 |
4.2 消息路由
主题系统
| 主题 | 用途 | 发布策略 |
|---|---|---|
GLOBAL_EVENTS |
Master 广播事件 | Always |
LOCAL_EVENTS |
Worker 发送事件 | Always |
COMMANDS |
命令消息 | Always |
ELECTION_MESSAGES |
选举消息 | Always |
DOWNLOAD_COMMANDS |
下载命令 | Always |
消息流向
flowchart TD
Start([组件 A 发送消息]) --> Receiver[TopicRouter.receiver
本地通道]
Receiver --> CheckPolicy{检查发布策略}
CheckPolicy -->|Always| SendNetwork[发送到网络]
CheckPolicy -->|Minimal| CheckLocal{有本地接收者?}
CheckPolicy -->|Never| LocalOnly[仅本地发送]
CheckLocal -->|否| SendNetwork
CheckLocal -->|是| LocalOnly
SendNetwork --> NetworkSender[networking_sender]
NetworkSender --> Publish[gossipsub_publish]
Publish --> GossipsubPub[libp2p Gossipsub 网络]
LocalOnly --> LocalSend[for sender in self.senders
await sender.send item]
第五部分:完整启动流程
节点启动时序图
flowchart TD
Start([用户启动 exo]) --> GenKeypair[1. 生成或加载 Keypair]
GenKeypair --> CreateRouter[2. 创建 Router]
CreateRouter --> TCPStack[创建 TCP 传输层]
CreateRouter --> Discovery[创建 Discovery Behaviour]
CreateRouter --> Gossipsub[创建 Gossipsub Behaviour]
CreateRouter --> Listen[监听 TCP 端口]
CreateRouter --> StartMDNS[启动 mDNS 服务]
StartMDNS --> RegisterTopic[3. 注册主题]
RegisterTopic --> Global[注册 GLOBAL_EVENTS]
RegisterTopic --> Local[注册 LOCAL_EVENTS]
RegisterTopic --> Commands[注册 COMMANDS]
RegisterTopic --> Election[注册 ELECTION_MESSAGES]
RegisterTopic --> Connection[注册 CONNECTION_MESSAGES]
RegisterTopic --> Download[注册 DOWNLOAD_COMMANDS]
Global & Local & Commands & Election & Connection & Download --> StartNetwork[4. 启动网络任务]
StartNetwork --> RunRouter[Router.run]
RunRouter --> Subscribe[订阅所有注册的主题]
RunRouter --> RecvTask[启动 _networking_recv]
RunRouter --> PublishTask[启动 _networking_publish]
RunRouter --> StartTopicRouter[启动每个 TopicRouter]
StartTopicRouter --> MDNSStart[开始 mDNS 发现]
MDNSStart --> Query[定期查询]
MDNSStart --> Response[响应其他节点的查询]
Query --> DiscoverPeers[5. mDNS 发现其他节点]
DiscoverPeers --> TryConnect[立即尝试连接]
TryConnect --> EstablishConn[6. 建立连接]
EstablishConn --> ConnEvent[发送 ConnectionEstablished 事件]
ConnEvent --> StartComponents[7. 启动应用组件]
StartComponents --> ElectionComp[Election 参与]
StartComponents --> WorkerComp[Worker 如果启用]
StartComponents --> MasterComp[Master 如果当选]
StartComponents --> APIComp[API 如果启用]
StartComponents --> P2PComm[8. 开始正常的 P2P 通信]
P2PComm --> GossipsubMsg[通过 Gossipsub 交换消息]
P2PComm --> ElectionProc[执行选举协议]
P2PComm --> Coordinate[协调推理任务]
第六部分:故障恢复与高可用
6.1 连接断开处理
1 | |
6.2 自动重连
1 | |
6.3 高可用性保证
故障检测
- Ping 超时检测(2.5 秒)
- 连接断开立即触发
- 自动重新选举
脑裂预防
- 所有节点使用相同的 Bully 算法
- 比较函数确定性的
- Clock 机制确保所有节点在同一轮选举
- Seniority 机制减少频繁切换
第七部分:性能优化
7.1 已实现优化
| 优化 | 效果 |
|---|---|
| 零 RTT 连接 | 减少握手延迟 |
| TCP_NODELAY | 禁用 Nagle 算法 |
| RDMA 通信 | 99% 延迟降低 |
| 张量并行 | 2 设备 1.8x,4 设备 3.2x |
| KV Cache | 减少重复计算 |
| 模型缓存 | 避免重复加载 |
7.2 性能数据
| 连接类型 | 延迟 | 带宽 | CPU 开销 |
|---|---|---|---|
| TCP/IP (Ethernet) | 基线 | 1-10 Gbps | 高 |
| TCP/IP (Thunderbolt) | ~50% 降低 | 40 Gbps | 中 |
| RDMA (Thunderbolt 5) | 99% 降低 | 80 Gbps | 极低 |
第八部分:监控与调试
8.1 日志级别
1 | |
8.2 关键日志消息
选举相关
1 | |
Thunderbolt 相关
1 | |
第九部分:配置与扩展
9.1 命令行参数
| 参数 | 说明 |
|---|---|
--no-worker |
不运行 Worker(仅协调器模式) |
--force-master |
强制成为 Master |
--spawn-api |
启动 API 服务 |
--no-downloads |
禁用下载协调器 |
--api-port |
API 端口(默认 52415) |
--offline |
离线模式 |
--bootstrap-peers |
Bootstrap peers 地址 |
-v, -vv |
日志级别 |
9.2 环境变量
| 变量 | 说明 | 默认值 |
|---|---|---|
EXO_DEFAULT_MODELS_DIR |
模型目录 | ~/.local/share/exo/models/ |
EXO_MODELS_DIRS |
额外模型目录 | 无 |
EXO_OFFLINE |
离线模式 | false |
EXO_ENABLE_IMAGE_MODELS |
启用图像模型 | false |
EXO_LIBP2P_NAMESPACE |
网络命名空间 | 无 |
EXO_TRACING_ENABLED |
启用追踪 | false |
总结
exo 的设备发现和连接建立机制具有以下特点:
分层发现机制
- mDNS(必需):发现网络上的节点,建立基本连接和身份映射
- Thunderbolt(优化):在已发现的节点间检测物理加速连接
- 依赖关系:Thunderbolt 依赖 mDNS 先发现节点并提供 domain_uuid → node_id 映射
高可用性
- Bully 算法选举
- 自动故障恢复
- Master 角色动态切换
- Bootstrap peers 备选方案
安全性
- Noise 协议加密
- 私有网络隔离
- 消息签名验证
高性能
- 零 RTT 连接、TCP_NODELAY
- RDMA over Thunderbolt 5 支持(99% 延迟降低)
- 张量并行(2 设备 1.8x,4 设备 3.2x)
智能拓扑识别
- 合并 mDNS 发现和 Thunderbolt 物理检测
- 在拓扑图中区分 RDMAConnection 和 SocketConnection
- 推理时自动选择最优路径
核心设计理念:exo 通过分层发现实现灵活的网络优化:
- mDNS 负责”能否通信” - 通过标准 IP 网络发现节点并建立身份映射
- Thunderbolt 负责”能否更快” - 在已发现的节点间检测硬件加速机会
- 关键依赖:Thunderbolt 无法独立工作,需要 mDNS 提供的节点信息才能建立逻辑连接
这种设计使 exo 能够在本地网络中自动形成集群,无需手动配置,同时保证了网络的稳定性和性能。对于配备 Thunderbolt 5 的 Mac 设备,exo 还能利用 RDMA 技术实现接近本地的超低延迟通信性能。