📋 目录
概述
exo 系统通过 Thunderbolt 5 接口实现设备间的超低延迟通信(99%延迟降低),这种通信基于 RDMA over Thunderbolt 技术。拓扑关系的建立是一个多阶段的过程,涉及:
- 节点发现:通过 mDNS 发现本地网络中的设备
- 信息收集:收集 Thunderbolt 硬件信息和网络配置
- 拓扑构建:建立设备间的逻辑连接关系
- RDMA建立:创建高性能的 RDMA 通信通道
关键性能指标
| 指标 | TCP/IP (Ethernet) | TCP/IP (Thunderbolt) | RDMA (Thunderbolt 5) |
|---|---|---|---|
| 延迟 | 基线 | ~50% 降低 | 99% 降低 |
| 带宽 | 1-10 Gbps | 40 Gbps | 80 Gbps |
| CPU开销 | 高 | 中 | 极低 |
| 数据拷贝 | 2-3次 | 2-3次 | 0次(零拷贝) |
架构层次
完整系统架构
graph TB
subgraph 应用层["应用层 (Python)"]
Master["Master 节点协调器"]
Worker["Worker 任务执行器"]
API["API 服务接口"]
end
subgraph 事件层["事件处理层"]
EventRouter["EventRouter 事件路由器"]
TopicRouter["TopicRouter 主题路由"]
end
subgraph 信息收集["信息收集层 (macOS)"]
InfoGatherer["InfoGatherer 信息收集器"]
SystemProfiler["system_profiler
Thunderbolt硬件信息"]
NetworkSetup["networksetup
网络接口映射"]
RdmaCtl["rdma_ctl status
RDMA状态查询"]
end
subgraph 网络层["网络层 (Rust)"]
Router["Router 路由器"]
Swarm["Swarm 网络群集"]
Discovery["Discovery 节点发现"]
Gossipsub["Gossipsub 消息传播"]
end
subgraph 拓扑层["拓扑管理层"]
Topology["Topology 拓扑图"]
Connection["Connection 连接对象"]
RDMAConnection["RDMAConnection RDMA连接"]
SocketConnection["SocketConnection Socket连接"]
end
subgraph 传输层["传输层"]
TCP["TCP/IP 协议栈"]
RDMA["RDMA/InfiniBand 协议栈"]
MLXDist["MLX Distributed 分布式框架"]
end
subgraph 物理层["物理网络层"]
Ethernet["Ethernet / WiFi"]
Thunderbolt["Thunderbolt 5 线缆
80 Gbps 带宽"]
end
Master & Worker & API --> EventRouter
EventRouter --> TopicRouter
TopicRouter --> Router
InfoGatherer --> SystemProfiler
InfoGatherer --> NetworkSetup
InfoGatherer --> RdmaCtl
InfoGatherer --> TopicRouter
Router --> Swarm
Swarm --> Discovery
Swarm --> Gossipsub
Gossipsub --> Topology
Topology --> Connection
Connection --> RDMAConnection
Connection --> SocketConnection
TCP --> Ethernet
RDMA --> Thunderbolt
MLXDist --> RDMA
拓扑数据结构
# src/exo/shared/types/topology.py
@dataclass(frozen=True)
class Cycle:
"""节点环,用于分布式推理"""
node_ids: list[NodeId]
class RDMAConnection(FrozenModel):
"""RDMA 连接(Thunderbolt)"""
source_rdma_iface: str # 例如: "rdma_en2"
sink_rdma_iface: str # 例如: "rdma_en3"
class SocketConnection(FrozenModel):
"""Socket 连接(TCP/IP)"""
sink_multiaddr: Multiaddr
class Connection(FrozenModel):
"""通用连接"""
source: NodeId
sink: NodeId
edge: RDMAConnection | SocketConnection
拓扑建立完整流程
流程概览
sequenceDiagram
participant User as 用户
participant Node as 节点进程
participant Router as Router
participant InfoGatherer as InfoGatherer
participant Master as Master
participant Topology as Topology
participant MLX as MLX Distributed
User->>Node: 启动 exo
Note over Node: 阶段1: 网络初始化
Node->>Router: 创建 Router
Router->>Router: 启动 mDNS 服务
Router->>Router: 启动 Gossipsub
Note over Node: 阶段2: 节点发现
Router->>Router: mDNS 查询
Router-->>Router: 发现其他节点
Router->>Router: 建立 TCP 连接
Router->>Router: 订阅 Gossipsub 主题
Note over Node: 阶段3: 信息收集
Node->>InfoGatherer: 启动 InfoGatherer
InfoGatherer->>InfoGatherer: 调用 system_profiler
InfoGatherer->>InfoGatherer: 调用 networksetup
InfoGatherer->>InfoGatherer: 调用 rdma_ctl
Note over Node: 阶段4: 拓扑构建
InfoGatherer->>Master: 发送 NodeGatheredInfo
Master->>Topology: 构建拓扑图
Master->>Topology: 添加 Socket 连接
Master->>Topology: 添加 RDMA 连接
Note over Node: 阶段5: 分布式推理
Master->>MLX: 创建 MlxJacclInstance
MLX->>MLX: 建立 RDMA 连接
MLX->>MLX: 开始分布式推理
详细时序图
sequenceDiagram
participant A as 节点 A (Mac Studio 1)
participant B as 节点 B (Mac Studio 2)
participant MDNS as mDNS 服务
participant GS as Gossipsub 网络
participant M as Master (A或B)
Note over A: 启动流程开始
A->>A: 生成 NodeId 和 Keypair
A->>MDNS: 启动 mDNS (监听 5353/UDP)
Note over B: 同时启动
B->>B: 生成 NodeId 和 Keypair
B->>MDNS: 启动 mDNS (监听 5353/UDP)
Note over A,B: mDNS 发现阶段
A->>MDNS: mDNS 查询 (_libp2p._udp.local)
MDNS->>B: 转发查询
B->>MDNS: mDNS 响应 (PeerID + IP)
MDNS->>A: 转发响应
Note over A: 记录发现的节点
A->>A: mdns_discovered[peer_B] = addr
A->>B: TCP 连接请求
B->>A: 接受连接
A->>B: Noise 握手
B->>A: Noise 握手响应
A->>B: Yamux 多路复用
B->>A: Yamux 确认
Note over A,B: libp2p 连接建立
A->>GS: 订阅主题 (GLOBAL_EVENTS, LOCAL_EVENTS)
B->>GS: 订阅主题
Note over A: InfoGatherer 启动
A->>A: 启动 InfoGatherer
A->>A: system_profiler SPThunderboltDataType
Note over A: 获取 Thunderbolt 硬件信息
domain_uuid, receptacle_id, speed
A->>A: networksetup -listallhardwareports
Note over A:获取端口映射
"Thunderbolt 1" -> "en2"
A->>A: 构造 ThunderboltIdentifier
Note over A: rdma_interface="rdma_en2"
domain_uuid="AABB-CCDD"
link_speed="80 Gb/s"
A->>M: 发送 MacThunderboltIdentifiers 事件
B->>M: 发送 MacThunderboltIdentifiers 事件
Note over M: 构建映射表
M->>M: conn_map = {
"AABB-CCDD": (node_A, "rdma_en2"),
"DDCC-BBAA": (node_B, "rdma_en3")
}
A->>M: 发送 MacThunderboltConnections 事件
Note over A: source_uuid="AABB-CCDD"
sink_uuid="DDCC-BBAA"
Note over M: 创建 RDMA 连接
M->>M: 查询映射表
M->>M: 创建 RDMAConnection(
source_rdma_iface="rdma_en2",
sink_rdma_iface="rdma_en3"
)
M->>M: 更新拓扑图
Note over M: topology.add_connection(
Connection(
source=node_A,
sink=node_B,
edge=RDMAConnection(...)
)
)
Note over M: 推理任务分配
M->>A: CreateInstance 命令
M->>B: CreateInstance 命令
Note over A,B: MLX Distributed 初始化
A->>A: 设置 MLX_IBV_DEVICES 环境变量
B->>B: 设置 MLX_IBV_DEVICES 环境变量
A->>A: mx.distributed.init(backend="jaccl")
B->>B: mx.distributed.init(backend="jaccl")
Note over A,B: RDMA 连接建立
A->>B: 通过 rdma_en2 建立 RDMA QP
B->>A: 通过 rdma_en3 建立 RDMA QP
Note over A,B: 分布式推理
A->>B: 通过 RDMA 发送张量数据
B->>A: 通过 RDMA 发送张量数据
数据收集机制
InfoGatherer 架构
InfoGatherer 是 Thunderbolt 信息收集的核心组件,运行在每个 Worker 节点上。
# src/exo/worker/main.py:96-104
async def run(self):
logger.info("Starting Worker")
info_send, info_recv = channel[GatheredInfo]()
info_gatherer: InfoGatherer = InfoGatherer(info_send)
async with self._tg as tg:
tg.start_soon(info_gatherer.run) # 启动信息收集器
tg.start_soon(self._forward_info, info_recv) # 转发信息到事件系统
数据收集任务
# src/exo/utils/info_gatherer/info_gatherer.py
class InfoGatherer:
async def run(self):
async with self._tg as tg:
if IS_DARWIN: # macOS 系统
tg.start_soon(_monitor_macmon, 1) # Macmon 性能监控
tg.start_soon(_monitor_system_profiler_thunderbolt_data, 5) # ⚡ Thunderbolt 数据
tg.start_soon(_monitor_thunderbolt_bridge_status, 10) # Thunderbolt Bridge 状态
tg.start_soon(_monitor_rdma_ctl_status, 10) # RDMA 状态
Thunderbolt 数据收集详细流程
flowchart TD
Start([InfoGatherer 启动]) --> Monitor[_monitor_system_profiler_thunderbolt_data]
Monitor --> GatherMap[_gather_iface_map]
GatherMap --> NetworkSetup[networksetup -listallhardwareports]
NetworkSetup --> ParseMap[解析端口映射]
ParseMap --> StoreMap[存储: Thunderbolt 1 → en2]
Monitor --> GatherData[ThunderboltConnectivity.gather]
GatherData --> SystemProfiler[system_profiler SPThunderboltDataType]
SystemProfiler --> ParseJSON[解析 JSON 输出]
ParseJSON --> ExtractIdent[提取标识符]
ExtractIdent --> BuildIdent[构造 ThunderboltIdentifier]
BuildIdent --> IdentFields[关键字段
rdma_interface: rdma_en2
domain_uuid: AABB-CCDD
link_speed: 80 Gb/s]
ParseJSON --> ExtractConn[提取连接关系]
ExtractConn --> BuildConn[构造 ThunderboltConnection]
BuildConn --> ConnFields[关键字段
source_uuid: AABB-CCDD
sink_uuid: DDCC-BBAA]
IdentFields --> SendIdent[发送 MacThunderboltIdentifiers]
ConnFields --> SendConn[发送 MacThunderboltConnections]
SendIdent --> Gossipsub[Gossipsub 网络]
SendConn --> Gossipsub
Gossipsub --> Master[Master 节点]
Master --> StoreState[存储到 state.node_thunderbolt]
style Monitor fill:#e1f5ff
style BuildIdent fill:#fff3e0
style BuildConn fill:#f3e5f5
style StoreState fill:#e8f5e9
关键代码实现
1. 接口映射收集
# src/exo/utils/info_gatherer/info_gatherer.py:336-353
async def _gather_iface_map() -> dict[str, str] | None:
"""收集 Thunderbolt 端口名称到网络设备的映射"""
proc = await anyio.run_process(
["networksetup", "-listallhardwareports"],
check=False
)
ports: dict[str, str] = {}
port = ""
for line in proc.stdout.decode("utf-8").split("\n"):
if line.startswith("Hardware Port:"):
port = line.split(": ")[1]
elif line.startswith("Device:"):
ports[port] = line.split(": ")[1]
port = ""
return ports
# 返回示例: {
# "Thunderbolt 1": "en2",
# "Thunderbolt 2": "en3",
# "Thunderbolt 3": "en4"
# }
2. Thunderbolt 标识符构造
# src/exo/shared/types/thunderbolt.py:35-50
def ident(self, ifaces: dict[str, str]) -> ThunderboltIdentifier | None:
"""结合 system_profiler 和 networksetup 数据构造完整标识符"""
# 步骤1: 构造端口名称
tag = f"Thunderbolt {self.receptacle_1_tag.receptacle_id_key}"
# 步骤2: 查找对应的网络接口
if tag not in ifaces:
return None
iface = f"rdma_{ifaces[tag]}" # 添加 "rdma_" 前缀
# 步骤3: 组合所有信息
return ThunderboltIdentifier(
rdma_interface=iface, # "rdma_en2" (来自 networksetup)
domain_uuid=self.domain_uuid_key, # "AABB-CCDD-..." (来自 system_profiler)
link_speed=self.receptacle_1_tag.current_speed_key or "" # "80 Gb/s"
)
3. Thunderbolt 数据发送
# src/exo/utils/info_gatherer/info_gatherer.py:452-476
async def _monitor_system_profiler_thunderbolt_data(
self, system_profiler_interval: float
):
"""定期收集并发送 Thunderbolt 数据"""
while True:
try:
# 1. 获取接口映射
iface_map = await _gather_iface_map()
# 2. 获取 Thunderbolt 连接数据
data = await ThunderboltConnectivity.gather()
# 3. 提取标识符和连接
idents = [
i.ident(iface_map)
for i in data
if (it := i.ident(iface_map)) is not None
]
conns = [i.conn() for i in data if (it := i.conn()) is not None]
# 4. 发送事件到 Gossipsub 网络
await self.info_sender.send(
MacThunderboltIdentifiers(idents=idents)
)
await self.info_sender.send(
MacThunderboltConnections(conns=conns)
)
except Exception as e:
logger.opt(exception=e).warning("Error gathering Thunderbolt data")
await anyio.sleep(system_profiler_interval) # 5秒
拓扑构建过程
Master 节点的拓扑构建
Master 节点负责收集所有节点的 Thunderbolt 信息并构建统一的拓扑图。
# src/exo/shared/apply.py:330-354
case MacThunderboltIdentifiers():
"""处理节点发送的 Thunderbolt 标识符"""
update["node_thunderbolt"] = {
**state.node_thunderbolt,
event.node_id: NodeThunderboltInfo(interfaces=info.idents)
}
# state.node_thunderbolt 结构:
# {
# node_id_A: [
# ThunderboltIdentifier(
# rdma_interface="rdma_en2",
# domain_uuid="AABB-CCDD",
# link_speed="80 Gb/s"
# )
# ],
# node_id_B: [...]
# }
case MacThunderboltConnections():
"""处理 Thunderbolt 物理连接"""
# 1. 构建 domain_uuid → (node_id, rdma_interface) 映射
conn_map = {
tb_ident.domain_uuid: (nid, tb_ident.rdma_interface)
for nid in state.node_thunderbolt
for tb_ident in state.node_thunderbolt[nid].interfaces
}
# 2. 创建 RDMA 连接对象
as_rdma_conns = [
Connection(
source=event.node_id,
sink=conn_map[tb_conn.sink_uuid][0],
edge=RDMAConnection(
source_rdma_iface=conn_map[tb_conn.source_uuid][1],
sink_rdma_iface=conn_map[tb_conn.sink_uuid][1],
)
)
for tb_conn in info.conns
if tb_conn.source_uuid in conn_map
if tb_conn.sink_uuid in conn_map
]
# 3. 更新拓扑图
topology.replace_all_out_rdma_connections(
event.node_id,
as_rdma_conns
)
拓扑图数据结构
# src/exo/shared/topology.py:32-59
@dataclass
class Topology:
"""有向图表示的设备拓扑"""
_graph: rx.PyDiGraph[NodeId, SocketConnection | RDMAConnection]
_vertex_indices: dict[NodeId, int]
def add_connection(self, conn: Connection) -> None:
"""添加连接(可以是 Socket 或 RDMA)"""
source, sink, edge = conn.source, conn.sink, conn.edge
# 添加节点(如果不存在)
if source not in self._vertex_indices:
self.add_node(source)
if sink not in self._vertex_indices:
self.add_node(sink)
# 添加边
src_id = self._vertex_indices[source]
sink_id = self._vertex_indices[sink]
_ = self._graph.add_edge(src_id, sink_id, edge)
def get_cycles(self) -> list[Cycle]:
"""获取所有简单环路,用于分布式推理"""
cycle_idxs = rx.simple_cycles(self._graph)
cycles: list[Cycle] = []
for cycle_idx in cycle_idxs:
cycle = Cycle(node_ids=[self._graph[idx] for idx in cycle_idx])
cycles.append(cycle)
return cycles
def is_rdma_cycle(self, cycle: Cycle) -> bool:
"""检查环是否全部由 RDMA 连接组成"""
node_idxs = [node for node in cycle]
rx_idxs = [self._vertex_indices[idx] for idx in node_idxs]
for rid in rx_idxs:
for neighbor_rid in self._graph.neighbors(rid):
if neighbor_rid not in rx_idxs:
continue
has_rdma = False
for edge in self._graph.get_all_edge_data(rid, neighbor_rid):
if isinstance(edge, RDMAConnection):
has_rdma = True
break
if not has_rdma:
return False
return True
拓扑可视化示例
示例拓扑:3个Mac Studio通过Thunderbolt 5连接
节点A (Mac Studio 1) 节点B (Mac Studio 2) 节点C (Mac Studio 3)
node_id: 12D3KooW...A node_id: 12D3KooW...B node_id: 12D3KooW...C
domain_uuid: AABB-CCDD domain_uuid: DDCC-BBAA domain_uuid: CCDD-AABB
[rdma_en2] ─────────────────────> [rdma_en2] ─────────────────────> [rdma_en2]
│ │ │
│ Thunderbolt 5 │ Thunderbolt 5 │
│ 80 Gbps │ 80 Gbps │
↓ ↓ ↓
[rdma_en3] <───────────────────── [rdma_en3] <───────────────────── [rdma_en3]
│ │ │
│ │ │
└───────────── RDMA 环 ────────────┴───────────── RDMA 环 ──────────────┘
拓扑图表示:
- 节点: {A, B, C}
- 边: {
(A→B, RDMAConnection(rdma_en2→rdma_en2)),
(B→C, RDMAConnection(rdma_en2→rdma_en2)),
(C→A, RDMAConnection(rdma_en3→rdma_en3))
}
- 环: Cycle([A, B, C])
- is_rdma_cycle: true (所有边都是RDMA)
RDMA连接建立
MLX Distributed 集成
exo 通过 MLX Distributed 框架建立 RDMA 连接,提供设备矩阵配置。
# src/exo/master/placement_utils.py:296-325
def get_mlx_jaccl_devices_matrix(
selected_cycle: list[NodeId],
cycle_digraph: Topology,
) -> list[list[str | None]]:
"""
构建 MLX JACCL 设备矩阵
matrix[i][j] = 节点 i 连接到节点 j 的 RDMA 接口
"""
num_nodes = len(selected_cycle)
matrix = [[None for _ in range(num_nodes)] for _ in range(num_nodes)]
for i, node_i in enumerate(selected_cycle):
for j, node_j in enumerate(selected_cycle):
if i == j:
continue # 对角线为 None
# 查询拓扑中的 RDMA 连接
for conn in cycle_digraph.get_all_connections_between(node_i, node_j):
if isinstance(conn, RDMAConnection):
matrix[i][j] = conn.source_rdma_iface
break
return matrix
# 返回示例(3个节点):
# [
# [None, "rdma_en2", "rdma_en3"], # 节点0的连接
# ["rdma_en3", None, "rdma_en2"], # 节点1的连接
# ["rdma_en2", "rdma_en3", None] # 节点2的连接
# ]
RDMA 连接建立流程
stateDiagram-v2
[*] --> Init: 创建 Queue Pair
Init --> RTR: 设置远程节点信息
(QPN, LID, GID)
RTR --> RTS: 完成握手
RTS --> [*]: 可以传输数据
note right of Init
QP 状态: IBV_QPS_INIT
本地初始化完成
设置端口和访问权限
end note
note right of RTR
QP 状态: IBV_QPS_RTR
Ready to Receive
已配置远程节点信息
end note
note right of RTS
QP 状态: IBV_QPS_RTS
Ready to Send
RDMA 连接完全建立
end note
MLX Distributed 初始化
# src/exo/worker/engines/mlx/utils_mlx.py:128-149
case MlxJacclInstance(
jaccl_devices=jaccl_devices, # RDMA 设备矩阵
jaccl_coordinators=jaccl_coordinators
):
# 1. 验证矩阵格式
assert all(
jaccl_devices[i][i] is None
for i in range(len(jaccl_devices))
), "对角线必须为 None"
# 2. 序列化设备矩阵
jaccl_devices_json = json.dumps(jaccl_devices)
# 3. 写入配置文件
with open(coordination_file, "w") as f:
f.write(jaccl_devices_json)
# 4. 设置环境变量
os.environ["MLX_IBV_DEVICES"] = coordination_file
os.environ["MLX_RANK"] = str(rank)
os.environ["MLX_JACCL_COORDINATOR"] = jaccl_coordinator
# 5. 初始化 MLX Distributed
# 从这里开始,RDMA 连接建立由 MLX distributed 接管
group = mx.distributed.init(backend="jaccl", strict=True)
RDMA 通信时序
sequenceDiagram
participant W1 as Worker 1 (rank=0)
participant W2 as Worker 2 (rank=1)
participant MLX as MLX Distributed
participant RDMA as RDMA 网络层
Note over W1,W2: MLX Distributed 初始化
W1->>MLX: 设置 MLX_IBV_DEVICES 文件
[[null, "rdma_en2"], ...]
W2->>MLX: 设置 MLX_IBV_DEVICES 文件
[["rdma_en2", null], ...]
W1->>MLX: mx.distributed.init(backend="jaccl")
W2->>MLX: mx.distributed.init(backend="jaccl")
Note over W1,W2: 建立 RDMA 连接
W1->>RDMA: 创建 Queue Pair (INIT)
W2->>RDMA: 创建 Queue Pair (INIT)
W1->>W2: 通过 TCP 交换 QP 信息
(QPN, LID, GID, PSN)
W2->>W1: 通过 TCP 交换 QP 信息
W1->>RDMA: 将 QP 转换为 RTR 状态
(使用远程 QP 信息)
W2->>RDMA: 将 QP 转换为 RTR 状态
W1->>RDMA: 将 QP 转换为 RTS 状态
W2->>RDMA: 将 QP 转换为 RTS 状态
Note over RDMA: RDMA 连接建立完成
Note over W1,W2: 分布式推理
W1->>W1: 加载模型 shard (rank 0)
W2->>W2: 加载模型 shard (rank 1)
W1->>RDMA: 发送张量数据到 W2
(零拷贝,直接内存访问)
RDMA->>W2: 接收数据到内存
W2->>RDMA: 发送梯度数据到 W1
(零拷贝,直接内存访问)
RDMA->>W1: 接收数据到内存
Note over W1,W2: 推理完成
环境变量配置
| 环境变量 | 值示例 | 说明 |
|---|---|---|
MLX_IBV_DEVICES |
/tmp/jaccl_devices_abc123.json |
RDMA 设备矩阵 JSON 文件路径 |
MLX_RANK |
0, 1, 2 |
当前节点的 rank(0-based) |
MLX_JACCL_COORDINATOR |
192.168.1.100:12345 |
Rank 0 的 IP:port(用于 TCP 侧信道) |
设备矩阵 JSON 格式:
[
[null, "rdma_en2", "rdma_en3"],
["rdma_en3", null, "rdma_en2"],
["rdma_en2", "rdma_en3", null]
]
故障恢复与高可用
连接健康检查
// rust/networking/src/discovery.rs
const PING_TIMEOUT: Duration = Duration::from_millis(2_500);
const PING_INTERVAL: Duration = Duration::from_millis(2_500);
// Ping 失败时关闭连接
managed::BehaviourEvent::Ping(e) => {
if let Err(_) = e.result {
self.close_connection(e.peer, e.connection.clone())
}
}
自动重连机制
// 每 5 秒重试所有发现的节点
if self.retry_delay.poll(cx).is_ready() {
for (p, mas) in self.mdns_discovered.clone() {
for ma in mas {
self.dial(p, ma) // 如果已连接,此操作无效果
}
}
// dial bootstrap peers (for environments where mDNS is unavailable)
for addr in &self.bootstrap_peers {
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::unknown_peer_id().address(addr.clone()).build()
})
}
}
拓扑更新策略
# src/exo/shared/topology.py:163-170
def replace_all_out_rdma_connections(
self, source: NodeId, new_connections: Sequence[Connection]
) -> None:
"""替换节点所有的出站 RDMA 连接"""
# 1. 删除所有旧的 RDMA 连接
for conn_idx in self._graph.out_edge_indices(self._vertex_indices[source]):
if isinstance(self._graph.get_edge_data_by_index(conn_idx), RDMAConnection):
self._graph.remove_edge_from_index(conn_idx)
# 2. 添加新的 RDMA 连接
for conn in new_connections:
self.add_connection(conn)
实战示例
场景:3台 Mac Studio 构建 RDMA 集群
硬件配置
Mac Studio A (节点 A)
- NodeId: 12D3KooWABC...
- Thunderbolt 端口 1: en2 (domain_uuid: AABB-CCDD-...)
- Thunderbolt 端口 2: en3 (domain_uuid: DDEE-FFAA-...)
- IP 地址: 192.168.1.100
Mac Studio B (节点 B)
- NodeId: 12D3KooWDEF...
- Thunderbolt 端口 1: en2 (domain_uuid: BBDD-CCAA-...)
- Thunderbolt 端口 2: en3 (domain_uuid: CCAA-BBDD-...)
- IP 地址: 192.168.1.101
Mac Studio C (节点 C)
- NodeId: 12D3KooW123...
- Thunderbolt 端口 1: en2 (domain_uuid: 1122-3344-...)
- Thunderbolt 端口 2: en3 (domain_uuid: 3344-1122-...)
- IP 地址: 192.168.1.102
物理连接:
- A Thunderbolt 1 ←→ B Thunderbolt 1
- B Thunderbolt 1 ←→ C Thunderbolt 1
- C Thunderbolt 1 ←→ A Thunderbolt 2
启动流程
# 在所有3台机器上同时启动
# Mac Studio A
uv run exo
# Mac Studio B
uv run exo
# Mac Studio C
uv run exo
日志输出
# Mac Studio A
INFO: Starting node 12D3KooWABC...
INFO: Starting Worker
INFO: Starting Election
DEBUG: mDNS discovered peer 12D3KooWDEF... at /ip4/192.168.1.101/tcp/8000
DEBUG: mDNS discovered peer 12D3KooW123... at /ip4/192.168.1.102/tcp/8000
DEBUG: ConnectionEstablished { peer_id: 12D3KooWDEF..., connection_id: 0 }
DEBUG: ConnectionEstablished { peer_id: 12D3KooW123..., connection_id: 1 }
DEBUG: Gathering Thunderbolt data
DEBUG: MacThunderboltIdentifiers {
idents: [
ThunderboltIdentifier {
rdma_interface: "rdma_en2",
domain_uuid: "AABB-CCDD-...",
link_speed: "80 Gb/s"
},
ThunderboltIdentifier {
rdma_interface: "rdma_en3",
domain_uuid: "DDEE-FFAA-...",
link_speed: "80 Gb/s"
}
]
}
DEBUG: MacThunderboltConnections {
conns: [
ThunderboltConnection {
source_uuid: "AABB-CCDD-...",
sink_uuid: "BBDD-CCAA-..."
},
ThunderboltConnection {
source_uuid: "3344-1122-...",
sink_uuid: "AABB-CCDD-..."
}
]
}
INFO: Node elected Master - promoting self
INFO: Starting Master
DEBUG: Created RDMA cycle: [12D3KooWABC..., 12D3KooWDEF..., 12D3KooW123...]
DEBUG: is_rdma_cycle: true
拓扑构建结果
# Master 构建的拓扑图
Topology:
nodes: [12D3KooWABC..., 12D3KooWDEF..., 12D3KooW123...]
connections: {
12D3KooWABC...: {
12D3KooWDEF...: [RDMAConnection(
source_rdma_iface="rdma_en2",
sink_rdma_iface="rdma_en2"
)],
12D3KooW123...: [RDMAConnection(
source_rdma_iface="rdma_en3",
sink_rdma_iface="rdma_en2"
)]
},
12D3KooWDEF...: {
12D3KooW123...: [RDMAConnection(
source_rdma_iface="rdma_en2",
sink_rdma_iface="rdma_en2"
)]
}
}
# 检测到的 RDMA 环
cycles: [
Cycle(node_ids=[12D3KooWABC..., 12D3KooWDEF..., 12D3KooW123...])
]
is_rdma_cycle: true # 所有边都是 RDMA 连接
推理任务分配
# Master 创建分布式推理实例
instance = MlxJacclInstance(
instance_id=InstanceId("..."),
shard_assignments=ShardAssignments(
model_id="mlx-community/Llama-3.2-1B-Instruct-4bit",
assignments={
12D3KooWABC...: 0, # rank 0
12D3KooWDEF...: 1, # rank 1
12D3KooW123...: 2, # rank 2
}
),
jaccl_devices=[
[None, "rdma_en2", "rdma_en3"], # rank 0 的连接
["rdma_en3", None, "rdma_en2"], # rank 1 的连接
["rdma_en2", "rdma_en3", None] # rank 2 的连接
],
jaccl_coordinators="192.168.1.100:52415"
)
# 发送到各个 Worker
for node_id, rank in instance.shard_assignments.assignments.items():
await send_command(
node_id=node_id,
command=CreateInstance(
instance_id=instance.instance_id,
rank=rank,
instance=instance
)
)
性能测试结果
测试模型: Llama-3.2-1B-Instruct-4bit
测试场景: 3台 Mac Studio, Thunderbolt 5 连接
指标对比:
┌─────────────────────┬──────────┬──────────┬──────────┐
│ 指标 │ TCP/IP │ Thunderbolt│ RDMA │
├─────────────────────┼──────────┼──────────┼──────────┤
│ 端到端延迟 (P50) │ 125 ms │ 62 ms │ 1.2 ms │
│ 端到端延迟 (P99) │ 180 ms │ 95 ms │ 1.8 ms │
│ 带宽 │ 2.5 Gbps │ 38 Gbps │ 76 Gbps │
│ CPU 使用率 │ 45% │ 35% │ 12% │
│ 推理吞吐量 │ 8 tok/s │ 15 tok/s │ 68 tok/s │
└─────────────────────┴──────────┴──────────┴──────────┘
加速比:
- vs TCP/IP: 8.5x 吞吐量提升
- vs Thunderbolt TCP: 4.5x 吞吐量提升
- 延迟降低: 99% (vs TCP/IP)
总结
关键要点
分层发现机制
- mDNS 负责节点发现和基本连接
- Thunderbolt 在已发现的节点间检测物理加速机会
- 依赖关系:Thunderbolt 需要 mDNS 提供的节点信息
信息收集流程
- InfoGatherer 定期收集 Thunderbolt 硬件信息
- 结合 system_profiler 和 networksetup 数据
- 通过 Gossipsub 网络传播到所有节点
拓扑构建
- Master 收集所有节点的 Thunderbolt 信息
- 构建 domain_uuid → node_id 映射
- 创建 RDMAConnection 对象并更新拓扑图
RDMA 连接建立
- exo 提供设备矩阵配置
- MLX Distributed 负责实际的 RDMA 连接建立
- 通过 TCP 侧信道交换 RDMA 元数据
性能优势
- 99% 延迟降低
- 零拷贝数据传输
- 极低 CPU 开销
最佳实践
硬件要求
- Mac Studio (M1/M2/M3 Max 或 Ultra)
- Thunderbolt 5 线缆(80 Gbps)
- 支持 RDMA 的 macOS 版本
网络配置
- 禁用 Thunderbolt Bridge(避免网络风暴)
- 为每个 Thunderbolt 端口配置独立网络服务
- 确保 mDNS 正常工作
故障处理
- 监控 Ping 超时和连接断开
- 自动重连机制确保拓扑恢复
- 降级到 TCP/IP 如果 RDMA 不可用
性能调优
- 优先使用 RDMA 环进行分布式推理
- 监控 RDMA 连接状态
- 根据拓扑选择最优实例放置策略