Atlas · Servo7

Trajectory Replay & the Per-Action RPC

How recorded trajectories are stored, armed, and drained — and why the orchestrator currently fetches the replay one frame at a time over a blocking bus RPC at 30 Hz. End-to-end walkthrough, the request/response mechanism that backs it, the overhead it costs, and three concrete ways to fix it.

Draft · 2026-06-02 · Companion to bus & digital twin and digital-twin flow

Contents

0 · TL;DR 1 · The data: a frame, the queue, the library 2 · Ownership: who holds the trajectory 3 · The replay lifecycle 4 · How request() works (corr + reply_to + Event) 5 · The in-process nuance 6 · The overhead, counted 7 · The remote special case 8 · Three ways to fix it 9 · Recommendation

0 · TL;DR

A recorded trajectory is a list of RobotState frames living in a deque inside a TrajectoryManager, which is owned by the follower's DigitalTwin — which is owned by the follower RobotNode. The orchestrator has no direct handle on it. So to play it back, the orchestrator's 30 Hz dispatch worker calls command_router.request(follower, GET_REPLAY_ACTION) once per frame — a blocking request/response over the pub/sub bus. A 300-frame trajectory = 301 RPC round-trips, each serializing one RobotState to JSON and back, each blocking the calling thread.

The intuition in the question is right: it would be cheaper to ship the whole trajectory to the orchestrator once at arm-time and iterate locally — or to stop pulling entirely and let the node push frames itself. §8 lays out both, plus the smaller async fix. The catch is the remote case (§7), where the node already drains its own queue server-side — any fix has to keep that path working.

1 · The data: a frame, the queue, the library

A single "replay action" is a RobotState

Every frame in a trajectory is a full RobotState dataclass servo7/robot/robot_types.py — the same object that flows everywhere else in the system. The replay-relevant fields:

@dataclass
class RobotState:
    timestamp: float
    joint_positions:  list[float] | None    # the core payload — N joint angles
    joint_velocities: list[float] | None
    joint_efforts:    list[float] | None
    gripper_position: float | None
    end_effector_pose: list[float] | None   # [x,y,z, qx,qy,qz,qw]
    robot_type: RobotType | None             # retagged on the way out (see §3)
    units: str = "radians"
    trace_id: str = ...                      # profiling
    hops: list[list] = ...                   # per-hop timestamps
    extras: dict[str, Any] = ...
    # ... + to_dict() / from_dict() for the wire

Serialized to JSON, one frame is ~1–2 KB. It already round-trips through to_dict()/from_dict(), so the whole trajectory is trivially serializable.

The queue and the library

The TrajectoryManager servo7/control/trajectory_manager.py keeps two things:

FieldTypeRole
_record_queuedeque[RobotState]The live playback queue. Replay drains it one popleft() at a time. Recording appends to it.
_trajectoriesdict[str, list[RobotState]]The named library of saved trajectories. "Load" copies a named entry (plus interpolated bridges) into _record_queue.
_replay_activeboolThe gate. Only set by an explicit Play (begin_replay()); get_action() returns None unless it's true.
_sequence_templatelist[RobotState]Loop buffer — when a looping sequence drains, the queue refills from this.

The single drain primitive — note it pops exactly one frame and is lock-guarded:

def get_action(self) -> "RobotState | None":
    with self._record_lock:
        if not self._record_queue:
            if self._loop_sequence and self._sequence_template:
                self._record_queue.extend(self._sequence_template)   # refill on loop
                ...
            else:
                return None
        return self._record_queue.popleft()

Interpolation (the smooth bridge from the current pose to the first frame, and between chained trajectories) is computed once at load time by _build_layout() and baked into _record_queue as ordinary frames. So by the time replay drains, every element is just a frame to pop — no per-tick math on the robot side.

2 · Ownership: who holds the trajectory

This is the whole reason the RPC exists. The trajectory lives several layers deep inside the follower node, and the orchestrator sits in a different object graph:

        ORCHESTRATOR NODE                         FOLLOWER ROBOT NODE
        ─────────────────                         ───────────────────
   OrchestratorNode                          RobotNode
     └─ command_router : CommandRouter          └─ robot : DigitalTwinPID   (a Robot)
            │                                          └─ trajectories : TrajectoryManager
            │   no direct reference                          ├─ _record_queue : deque[RobotState]
            │   ───────✗──────────►                          ├─ _trajectories : dict[str,list]
            │                                                └─ _replay_active : bool
            │
            └──────────  bus (PubSub)  ──────────────► _on_robot_command()  ──► handler ──► twin
                 cmd.follower / reply.orch
The orchestrator can only reach the queue by sending a command over the bus. There is no shared pointer — by design, so the exact same code works when the follower is in another process or another host (§7).

3 · The replay lifecycle

Three phases: arm (once), drain (per frame, the hot path), stop (once).

Phase 1 — Arm (one RPC)

OrchestratorNode.trigger_replay() orchestrator_node.py:1064 sends one TRIGGER_REPLAY request. On the node, _trigger_replay() loads the named trajectory/sequence into _record_queue, calls begin_replay(), and returns the loaded frame count. The orchestrator then flips its own control_mode = TRAJECTORIES and starts a mode blend.

Phase 2 — Drain (the hot path: one RPC per frame)

A dedicated sync thread, command_dispatch_worker() orchestrator_node.py:396, spins at a fixed 30 Hz. In TRAJECTORIES mode each tick does:

# orchestrator_node.py — command_dispatch_worker(), 30 Hz loop
elif self.control_mode == ControlMode.TRAJECTORIES:
    action = self._pop_replay_action()           # <-- blocking RPC, EVERY tick
    if action is not None:
        self.smooth_and_publish_action(action, source="controller")

# _pop_replay_action(), orchestrator_node.py:695
def _pop_replay_action(self) -> "RobotState | None":
    raw = self.command_router.request(self.follower_robot_id,
                                      RobotCommand.GET_REPLAY_ACTION)   # round-trip
    if raw is None:
        return None
    return RobotState.from_dict(raw)

On the node side the handler pops one frame and retags its robot_type so the follower's converter passes it through unchanged:

# robot_node.py:446 — _get_replay_action()
twin = self._twin()
if twin is None or not getattr(twin, "_replay_active", False):
    return None
action = twin.get_action()         # popleft() one frame
if action is None:
    return None
d = action.to_dict()
d["robot_type"] = self.robot.type.value
return d
   ORCH dispatch thread (30 Hz)                       FOLLOWER node
   ────────────────────────────                       ─────────────
   tick ──► _pop_replay_action()
              request(GET_REPLAY_ACTION) ──cmd.follower──► _on_robot_command
                                                              _get_replay_action()
                                                                twin.get_action() → frame
                                                                frame.to_dict()
                          reply.orch ◄────────────────────── publish {corr, value}
              RobotState.from_dict(raw)
            smooth_and_publish_action ──action.follower──► _on_command_received
   wait 33 ms                                                robot.set_state(...)
   tick ──► (repeat, once per frame, for the whole trajectory)
Every frame: one serialize on the node, one deserialize on the orchestrator, one blocking wait. Then the popped frame is re-serialized and published again on action.follower to actually move the robot.

Phase 3 — Stop (one RPC)

stop_replay()STOP_REPLAY_stop_replay() calls end_replay() and optionally clear_recording(). Sibling arming commands ride the same channel: GOTO_TRAJECTORY (smooth move to start/end), SNAP_TWIN_ANCHOR, GET_TRAJECTORY_FRAMES (scrub).

4 · How request() works — corr + reply_to + Event

The bus only does fire-and-forget pub/sub. CommandRouter.request() command_router.py:125 fakes a synchronous RPC on top of it with two tricks: a correlation id stamped on the message and echoed on the reply, and a reply_to topic the node publishes the answer on. A per-request threading.Event blocks the caller until the answer lands.

def request(self, robot_id, action, params=None, timeout=2.0) -> Any:
    self._ensure_reply_subscription()                       # subscribe reply.orch once
    corr = next(self._corr)                                 # unique id: 1,2,3,...
    slot = {"event": threading.Event(), "value": None}      # this request's mailbox
    with self._pending_lock:
        self._pending[corr] = slot                          # register by corr
    self.bus.publish(command_topic(robot_id), dumps({
        "action": act, "params": params or {},
        "reply_to": self._reply_topic, "corr": corr }))     # ask
    slot["event"].wait(timeout)                             # <-- BLOCKS here (≤ 2s)
    with self._pending_lock:
        self._pending.pop(corr, None)
    return slot["value"]                                    # answer, or None on timeout

def _on_reply(self, raw):                                   # fires on reply.orch
    msg = loads(raw)
    slot = self._pending.get(msg.get("corr"))               # find the right mailbox
    if slot is not None:
        slot["value"] = msg.get("value")                    # drop the answer in
        slot["event"].set()                                 # ring the doorbell

slot

A per-request mailbox: {event, value}. Stored in _pending[corr] so the reply handler (running later, maybe on another thread) can find the exact caller that's waiting. value starts None — which is also what you get on timeout.

threading.Event

A flag + waiting room. wait(timeout) parks the caller; set() (rung by _on_reply) wakes it. The timeout means a node that never answers can't hang you forever. Handles both orderings — see §5.

5 · The in-process nuance (important before "fixing the threading")

With the default InProcessBus, publish() invokes subscriber callbacks synchronously, on the caller's own thread in_process_bus.py:30. So a "request" is really one deep synchronous call stack — the robot handler runs inside the publish on line 142, sets the event, and only then does control reach the wait() on line 148.

   ONE THREAD, ONE CALL STACK (InProcessBus)

   request ─► bus.publish("cmd.follower") ─► _on_robot_command ─► get_action()
                                                                 ─► bus.publish("reply.orch")
                                                                       ─► _on_reply ─► event.set()  ★
   request (cont.) ─► event.wait(2.0)   ← event ALREADY set at ★, returns instantly
                   ─► return value
In-process, the Event/Lock/_pending machinery does almost nothing — the answer is in the box before you wait. The threading only earns its keep on ZmqBus / MpBus / UdpBus, where _on_reply genuinely fires on a different receiver thread and wait() actually sleeps.

Two consequences worth holding onto:

6 · The overhead, counted

TrajectoryFramesRPC round-tripsJSON (de)serializationsWall time @30Hz
Short pick~9091~3603 s
Typical~300301~120010 s
Looping sequenceunboundedunboundeduntil stopped
Per frame the payload is serialized twice on the orchestrator's behalf: once for the GET_REPLAY_ACTION reply, once again when smooth_and_publish_action republishes it on action.follower. The replay pull is the redundant half.

None of this is latency-critical — the frames were captured at 30 Hz and replay just paces them back out at 30 Hz, so a few hundred µs of serialization per tick is comfortably inside the 33 ms budget. The objection is architectural, not a measured stall: a chatty N-round-trip pull where one transfer (or zero) would do, with the queue's natural owner reduced to answering "give me the next one" 300 times.

7 · The remote special case (the constraint any fix must respect)

When the follower is a remote robot, RobotNode.robot is a RemoteRobotClient proxy whose twin is _RemoteDigitalTwin robot/remote/robot_client.py:650. Its get_action() returns None on purpose — the remote server drains its own queue locally and streams frames out. The orchestrator's per-tick pull is already a no-op in that mode; arming commands (load_*_to_queue, begin_replay) forward as ZMQ sideband messages.

So there are really two replay engines today:

The remote path is the proof that a push model already works in this codebase — which points at the cleanest fix.

8 · Three ways to fix it

Option A — ship it once

Make TRIGGER_REPLAY (or a new GET_TRAJECTORY_FRAMES at arm-time) return the entire loaded queue as a list of dicts. The orchestrator caches it locally and the 30 Hz worker iterates an in-memory list — zero per-frame RPC.

1 transfer simple

Cost: one ~300 KB message at arm. Looping = re-iterate the local list. Must decide who owns loop-refill/interp once it's client-side, and remote still drains server-side so the orchestrator just skips local iteration there.

Option B — node pushes

Stop pulling entirely. After arm, the follower node drains its own queue on its publish worker and emits frames straight onto action.follower at 30 Hz — exactly what the remote path already does. The orchestrator only sends arm/stop.

0 pull RPCs unifies local + remote

Cost: smoothing/mode-blend currently lives in the orchestrator (smooth_and_publish_action); pushing means moving that onto the node or applying it downstream. Biggest change, best end state.

Option C — async the RPC

Keep the pull, but add await request_async() backed by an asyncio.Future resolved via loop.call_soon_threadsafe. Removes the event-loop stall on arm/stop and any future async callers.

still N round-trips smallest diff

Doesn't reduce chattiness — it only stops the blocking wait() from freezing the loop. Orthogonal to A/B; worth doing regardless for the arm/stop calls made from async context.

9 · Recommendation

Aim for Option B, land Option A first.

  1. A as the quick win: have arm return the full frame list; iterate locally. Kills the 300× round-trips with a contained diff and no change to where smoothing lives. Immediately makes local replay behave like one transfer.
  2. B as the real target: a push-drain on the node unifies local and remote replay into one engine — the orchestrator just arms and stops. This is where the architecture wants to be; the remote path already proves it works.
  3. C regardless: the arm/stop/snap calls are issued from the async orchestrator and block the loop up to the 2 s timeout. An asyncio.Future-backed request_async fixes that independent of A/B.

Whatever path we pick, keep the InProcessBus "everything goes through the bus" profiling story in mind (§5) — it's why the per-frame detour exists in the first place. Option B preserves it (frames still publish on action.follower); Option A bypasses the pull but keeps the publish.