How recorded trajectories are stored, armed, and drained — and why the orchestrator currently fetches the replay one frame at a time over a blocking bus RPC at 30 Hz. End-to-end walkthrough, the request/response mechanism that backs it, the overhead it costs, and three concrete ways to fix it.
A recorded trajectory is a list of RobotState frames living in a deque inside a TrajectoryManager, which is owned by the follower's DigitalTwin — which is owned by the follower RobotNode. The orchestrator has no direct handle on it. So to play it back, the orchestrator's 30 Hz dispatch worker calls command_router.request(follower, GET_REPLAY_ACTION) once per frame — a blocking request/response over the pub/sub bus. A 300-frame trajectory = 301 RPC round-trips, each serializing one RobotState to JSON and back, each blocking the calling thread.
The intuition in the question is right: it would be cheaper to ship the whole trajectory to the orchestrator once at arm-time and iterate locally — or to stop pulling entirely and let the node push frames itself. §8 lays out both, plus the smaller async fix. The catch is the remote case (§7), where the node already drains its own queue server-side — any fix has to keep that path working.
RobotStateEvery frame in a trajectory is a full RobotState dataclass servo7/robot/robot_types.py — the same object that flows everywhere else in the system. The replay-relevant fields:
@dataclass
class RobotState:
timestamp: float
joint_positions: list[float] | None # the core payload — N joint angles
joint_velocities: list[float] | None
joint_efforts: list[float] | None
gripper_position: float | None
end_effector_pose: list[float] | None # [x,y,z, qx,qy,qz,qw]
robot_type: RobotType | None # retagged on the way out (see §3)
units: str = "radians"
trace_id: str = ... # profiling
hops: list[list] = ... # per-hop timestamps
extras: dict[str, Any] = ...
# ... + to_dict() / from_dict() for the wire
Serialized to JSON, one frame is ~1–2 KB. It already round-trips through to_dict()/from_dict(), so the whole trajectory is trivially serializable.
The TrajectoryManager servo7/control/trajectory_manager.py keeps two things:
| Field | Type | Role |
|---|---|---|
_record_queue | deque[RobotState] | The live playback queue. Replay drains it one popleft() at a time. Recording appends to it. |
_trajectories | dict[str, list[RobotState]] | The named library of saved trajectories. "Load" copies a named entry (plus interpolated bridges) into _record_queue. |
_replay_active | bool | The gate. Only set by an explicit Play (begin_replay()); get_action() returns None unless it's true. |
_sequence_template | list[RobotState] | Loop buffer — when a looping sequence drains, the queue refills from this. |
The single drain primitive — note it pops exactly one frame and is lock-guarded:
def get_action(self) -> "RobotState | None":
with self._record_lock:
if not self._record_queue:
if self._loop_sequence and self._sequence_template:
self._record_queue.extend(self._sequence_template) # refill on loop
...
else:
return None
return self._record_queue.popleft()
Interpolation (the smooth bridge from the current pose to the first frame, and between chained trajectories) is computed once at load time by _build_layout() and baked into _record_queue as ordinary frames. So by the time replay drains, every element is just a frame to pop — no per-tick math on the robot side.
This is the whole reason the RPC exists. The trajectory lives several layers deep inside the follower node, and the orchestrator sits in a different object graph:
ORCHESTRATOR NODE FOLLOWER ROBOT NODE
───────────────── ───────────────────
OrchestratorNode RobotNode
└─ command_router : CommandRouter └─ robot : DigitalTwinPID (a Robot)
│ └─ trajectories : TrajectoryManager
│ no direct reference ├─ _record_queue : deque[RobotState]
│ ───────✗──────────► ├─ _trajectories : dict[str,list]
│ └─ _replay_active : bool
│
└────────── bus (PubSub) ──────────────► _on_robot_command() ──► handler ──► twin
cmd.follower / reply.orch
Three phases: arm (once), drain (per frame, the hot path), stop (once).
OrchestratorNode.trigger_replay() orchestrator_node.py:1064 sends one TRIGGER_REPLAY request. On the node, _trigger_replay() loads the named trajectory/sequence into _record_queue, calls begin_replay(), and returns the loaded frame count. The orchestrator then flips its own control_mode = TRAJECTORIES and starts a mode blend.
A dedicated sync thread, command_dispatch_worker() orchestrator_node.py:396, spins at a fixed 30 Hz. In TRAJECTORIES mode each tick does:
# orchestrator_node.py — command_dispatch_worker(), 30 Hz loop
elif self.control_mode == ControlMode.TRAJECTORIES:
action = self._pop_replay_action() # <-- blocking RPC, EVERY tick
if action is not None:
self.smooth_and_publish_action(action, source="controller")
# _pop_replay_action(), orchestrator_node.py:695
def _pop_replay_action(self) -> "RobotState | None":
raw = self.command_router.request(self.follower_robot_id,
RobotCommand.GET_REPLAY_ACTION) # round-trip
if raw is None:
return None
return RobotState.from_dict(raw)
On the node side the handler pops one frame and retags its robot_type so the follower's converter passes it through unchanged:
# robot_node.py:446 — _get_replay_action()
twin = self._twin()
if twin is None or not getattr(twin, "_replay_active", False):
return None
action = twin.get_action() # popleft() one frame
if action is None:
return None
d = action.to_dict()
d["robot_type"] = self.robot.type.value
return d
ORCH dispatch thread (30 Hz) FOLLOWER node
──────────────────────────── ─────────────
tick ──► _pop_replay_action()
request(GET_REPLAY_ACTION) ──cmd.follower──► _on_robot_command
_get_replay_action()
twin.get_action() → frame
frame.to_dict()
reply.orch ◄────────────────────── publish {corr, value}
RobotState.from_dict(raw)
smooth_and_publish_action ──action.follower──► _on_command_received
wait 33 ms robot.set_state(...)
tick ──► (repeat, once per frame, for the whole trajectory)
action.follower to actually move the robot.stop_replay() → STOP_REPLAY → _stop_replay() calls end_replay() and optionally clear_recording(). Sibling arming commands ride the same channel: GOTO_TRAJECTORY (smooth move to start/end), SNAP_TWIN_ANCHOR, GET_TRAJECTORY_FRAMES (scrub).
request() works — corr + reply_to + EventThe bus only does fire-and-forget pub/sub. CommandRouter.request() command_router.py:125 fakes a synchronous RPC on top of it with two tricks: a correlation id stamped on the message and echoed on the reply, and a reply_to topic the node publishes the answer on. A per-request threading.Event blocks the caller until the answer lands.
def request(self, robot_id, action, params=None, timeout=2.0) -> Any:
self._ensure_reply_subscription() # subscribe reply.orch once
corr = next(self._corr) # unique id: 1,2,3,...
slot = {"event": threading.Event(), "value": None} # this request's mailbox
with self._pending_lock:
self._pending[corr] = slot # register by corr
self.bus.publish(command_topic(robot_id), dumps({
"action": act, "params": params or {},
"reply_to": self._reply_topic, "corr": corr })) # ask
slot["event"].wait(timeout) # <-- BLOCKS here (≤ 2s)
with self._pending_lock:
self._pending.pop(corr, None)
return slot["value"] # answer, or None on timeout
def _on_reply(self, raw): # fires on reply.orch
msg = loads(raw)
slot = self._pending.get(msg.get("corr")) # find the right mailbox
if slot is not None:
slot["value"] = msg.get("value") # drop the answer in
slot["event"].set() # ring the doorbell
A per-request mailbox: {event, value}. Stored in _pending[corr] so the reply handler (running later, maybe on another thread) can find the exact caller that's waiting. value starts None — which is also what you get on timeout.
A flag + waiting room. wait(timeout) parks the caller; set() (rung by _on_reply) wakes it. The timeout means a node that never answers can't hang you forever. Handles both orderings — see §5.
With the default InProcessBus, publish() invokes subscriber callbacks synchronously, on the caller's own thread in_process_bus.py:30. So a "request" is really one deep synchronous call stack — the robot handler runs inside the publish on line 142, sets the event, and only then does control reach the wait() on line 148.
ONE THREAD, ONE CALL STACK (InProcessBus)
request ─► bus.publish("cmd.follower") ─► _on_robot_command ─► get_action()
─► bus.publish("reply.orch")
─► _on_reply ─► event.set() ★
request (cont.) ─► event.wait(2.0) ← event ALREADY set at ★, returns instantly
─► return value
_on_reply genuinely fires on a different receiver thread and wait() actually sleeps.Two consequences worth holding onto:
dumps → bus → loads → handler → dumps → bus → loads, per frame, for what is morally a local deque.popleft().command_dispatch_worker is a dedicated sync thread, so the per-frame block doesn't freeze the event loop. (The arm/stop calls are made from async context, though — those are the ones that can stall the loop for up to the 2 s timeout.)| Trajectory | Frames | RPC round-trips | JSON (de)serializations | Wall time @30Hz |
|---|---|---|---|---|
| Short pick | ~90 | 91 | ~360 | 3 s |
| Typical | ~300 | 301 | ~1200 | 10 s |
| Looping sequence | ∞ | unbounded | unbounded | until stopped |
Per frame the payload is serialized twice on the orchestrator's behalf: once for the GET_REPLAY_ACTION reply, once again when smooth_and_publish_action republishes it on action.follower. The replay pull is the redundant half. | ||||
None of this is latency-critical — the frames were captured at 30 Hz and replay just paces them back out at 30 Hz, so a few hundred µs of serialization per tick is comfortably inside the 33 ms budget. The objection is architectural, not a measured stall: a chatty N-round-trip pull where one transfer (or zero) would do, with the queue's natural owner reduced to answering "give me the next one" 300 times.
When the follower is a remote robot, RobotNode.robot is a RemoteRobotClient proxy whose twin is _RemoteDigitalTwin robot/remote/robot_client.py:650. Its get_action() returns None on purpose — the remote server drains its own queue locally and streams frames out. The orchestrator's per-tick pull is already a no-op in that mode; arming commands (load_*_to_queue, begin_replay) forward as ZMQ sideband messages.
So there are really two replay engines today:
None and is ignored.The remote path is the proof that a push model already works in this codebase — which points at the cleanest fix.
Make TRIGGER_REPLAY (or a new GET_TRAJECTORY_FRAMES at arm-time) return the entire loaded queue as a list of dicts. The orchestrator caches it locally and the 30 Hz worker iterates an in-memory list — zero per-frame RPC.
1 transfer simple
Cost: one ~300 KB message at arm. Looping = re-iterate the local list. Must decide who owns loop-refill/interp once it's client-side, and remote still drains server-side so the orchestrator just skips local iteration there.
Stop pulling entirely. After arm, the follower node drains its own queue on its publish worker and emits frames straight onto action.follower at 30 Hz — exactly what the remote path already does. The orchestrator only sends arm/stop.
0 pull RPCs unifies local + remote
Cost: smoothing/mode-blend currently lives in the orchestrator (smooth_and_publish_action); pushing means moving that onto the node or applying it downstream. Biggest change, best end state.
Keep the pull, but add await request_async() backed by an asyncio.Future resolved via loop.call_soon_threadsafe. Removes the event-loop stall on arm/stop and any future async callers.
still N round-trips smallest diff
Doesn't reduce chattiness — it only stops the blocking wait() from freezing the loop. Orthogonal to A/B; worth doing regardless for the arm/stop calls made from async context.
Aim for Option B, land Option A first.
asyncio.Future-backed request_async fixes that independent of A/B.Whatever path we pick, keep the InProcessBus "everything goes through the bus" profiling story in mind (§5) — it's why the per-frame detour exists in the first place. Option B preserves it (frames still publish on action.follower); Option A bypasses the pull but keeps the publish.