跳到主要内容

Flow

Flow 是 NRCP 中用于持续数据流的协议抽象,对标 ROS Topic。状态反馈、连续控制指令、机器人本体故障通知等持续数据都应建模为 Flow

Heartbeat 不属于 Flow 机制。Operation 也不属于 Flow 机制

角色

角色含义
PubFlow 数据发布方
SubFlow 数据订阅方

Server Push 型 Flow 中,Server 为 Pub,Client 为 Sub。Client Push 型 Flow 中,Client 为 Pub,Server 为 Sub

连接状态约束

Flow 只能在本地连接状态为 CONNECTED 时建立和运行。Endpoint 处于 DEGRADEDRECOVERINGCLOSING 时,不得建立新 Flow,也不得将收到的 FLOW_DATA 交给业务处理逻辑

Heartbeat 与时钟同步不属于 Flow。连接进入 CONNECTED 后,Heartbeat 与时钟同步必须持续维护;一旦本地 Session 进入 DEGRADED,已建立 Flow 应进入暂停、关闭或等待恢复状态

生命周期

Flow 生命周期是 Endpoint 的本地状态,不要求双方在同一时刻强一致

状态含义允许行为
NEGOTIATINGFlow 协商中发送和接收对应 Flow 控制消息
ACTIVEFlow 已建立且本地连接状态为 CONNECTED发送和接收 FLOW_DATA
PAUSED本地连接不满足运行条件,或正在恢复不处理 FLOW_DATA,可等待恢复或关闭
CLOSINGFlow 关闭中发送或接收 FLOW_CLOSE,释放资源
CLOSEDFlow 已关闭丢弃晚到 FLOW_DATA
EXPIREDFlow 授权或活性超时丢弃 FLOW_DATA,释放上下文

典型状态迁移:

NEGOTIATING
-> ACTIVE
-> PAUSED
-> ACTIVE
-> CLOSING
-> CLOSED

NEGOTIATING
-> CLOSING
-> CLOSED

ACTIVE
-> EXPIRED
-> CLOSED

进入 DEGRADEDRECOVERING 时,Endpoint 应将本地 Active Flow 标记为 PAUSED 或按业务策略关闭。只有恢复流程完成且本地连接状态重新进入 CONNECTED 后,Flow 才能重新进入 ACTIVE

Subscribe Flow

Subscribe Flow 用于 Sub 向 Pub 申请订阅某个 Flow

典型流程:

  1. Sub 发送 FLOW_SUBSCRIBE_REQUEST,携带 Flow Name 和期望 QoS;
  2. Pub 校验资源、Flow 是否存在、Payload Schema 和 QoS 可用性;
  3. Pub 返回 FLOW_SUBSCRIBE_RESPONSE,其中携带最终 Flow QoS Grant;
  4. Sub 校验授权结果是否可接受;
  5. Sub 发送 FLOW_SUBSCRIBE_ACK
  6. Pub 收到 ACK 后开始发送 FLOW_DATA

Publish Flow

Publish Flow 用于 Pub 通知 Sub 自己准备发布某个 Flow

典型流程:

  1. Pub 发送 FLOW_PUBLISH_REQUEST,携带 Flow Name 和目标 QoS;
  2. Sub 校验是否接受该 Flow、Payload Schema 和 QoS;
  3. Sub 返回 FLOW_PUBLISH_RESPONSE,其中携带最终 Flow QoS Grant;
  4. Pub 校验授权结果是否可接受;
  5. Pub 发送 FLOW_PUBLISH_ACK
  6. Pub 开始发送 FLOW_DATA

Flow Data

FLOW_DATA 必须属于已经建立的 FlowContext。接收端必须通过 channel_id + flow_epoch 查询 FlowContext,获得 Flow Name、Payload Codec、Schema、QoS Grant 和接收策略

FLOW_DATA 不应在 Payload 中重复携带 Schema 或 QoS 参数。接收端不能信任发送端逐帧自报的 QoS 或 Schema

FLOW_DATA 只能在 Flow 本地状态为 ACTIVE 时处理。若本地连接状态不是 CONNECTED,或者 Flow 处于 PAUSEDCLOSINGCLOSEDEXPIRED,接收端必须丢弃或拒绝该 FLOW_DATA

接收端处理 FLOW_DATA 时至少应校验:

  • channel_id 是否存在;
  • flow_epoch 是否匹配当前 FlowContext;
  • Message Type 是否为该 Flow 允许的类型;
  • Payload Codec 是否符合 FlowContext;
  • Payload Length 是否有效;
  • 消息是否超过 TTL;
  • Sequence 是否满足去重和乱序策略;
  • 频率是否超过 QoS Grant;

Sequence

FLOW_DATA 必须启用 Sequence。发送端在同一 (channel_id, flow_epoch, direction) 空间内为每个发送的 FLOW_DATA 分配单调递增的 Sequence

Sequence 规则如下:

  • FLOW_DATA 的 Sequence 初始值为 1
  • 同一 (channel_id, flow_epoch, direction) 内发送端每发送一个 FLOW_DATA,Sequence 加 1
  • flow_epoch 变化后,新的 Epoch 使用独立 Sequence 空间,默认从 1 重新开始;
  • 接收端应按 Flow QoS 和本地策略维护去重窗口、乱序窗口和丢包统计;
  • best_effort Flow,接收端不得要求 Sequence 连续才能处理最新数据;
  • reliable Flow,接收端可将 Sequence 缺口作为诊断或协议异常信号;
  • 非 Flow 消息默认使用 sequence = 0
  • Flow 控制消息默认使用 sequence = 0,除非具体扩展明确要求;

Flow Close

Pub 和 Sub 均允许主动关闭 Flow,但必须通知对端。关闭通知使用 FLOW_CLOSE

FLOW_CLOSE Payload 默认使用 JSON,字段定义如下:

字段必填含义
reason关闭原因码
message可读说明
final_sequence最后一个有效 Sequence

reason 取值如下:

  • NORMAL
  • UNSUBSCRIBE
  • PUBLISHER_STOPPED
  • QOS_NOT_ACCEPTABLE
  • FLOW_EXPIRED
  • SESSION_DEGRADED
  • RATE_LIMITED
  • PAYLOAD_SCHEMA_CHANGED
  • INTERNAL_ERROR

收到 FLOW_CLOSE 后,接收端应停止接收该 Flow 后续 FLOW_DATA,释放对应 FlowContext,并丢弃晚到消息

QoS 协商

Subscribe Flow 和 Publish Flow 都必须在发送 FLOW_DATA 前完成 QoS 协商。协商结果由 Pub 或 Server-authoritative 的策略生成,并绑定到 FlowContext

当前版本不实现 QoS 降级。后续版本通过 QOS_EVENTFLOW_UPDATE 表达 QoS 更新或降级