In local mode, the whole system — Robot A (leader), Robot B (follower), orchestrator, system-state tracker, in-process bus — lives inside a single Python process. Both robots plug into the same machine; there is no network between them.
Each robot is represented by a RobotNode that spawns a small family of worker threads via asyncio.to_thread. The orchestrator registers callbacks on the bus so that state updates from one robot turn into commands for the other. Hardware is generic — the leader might talk over USB-serial, the follower over CAN, or vice versa — but none of that matters for the control flow.
What we want to answer on this page: when the operator moves Robot A, what exactly happens, on which threads, and how long does each step take before Robot B moves?
for loop over subscribersThe single most important piece of the mental model is that the in-process bus has no worker of its own. Its publish() is about ten lines:
class InProcessBus: def publish(self, topic: str, payload: bytes) -> None: with self._lock: self._latest[topic] = payload subscribers = list(self._subscribers.get(topic, ())) for callback in subscribers: try: callback(payload) # synchronous — runs on the publisher's thread except Exception as e: self.logger.error(...)
There is no message queue, no event loop, no dispatcher. publish() grabs the subscriber list, then calls each callback in a Python for loop. Because Python calls functions synchronously, every subscriber runs on whatever thread called publish().
The practical consequence: when the leader's publisher calls bus.publish("state.leader", ...), the system-state manager's subscriber runs on the leader's publisher thread. That subscriber fires the orchestrator's registered state-change callback — still on the leader's publisher thread. That callback calls bus.publish("action.follower", ...), which invokes the follower's subscriber — still on the leader's publisher thread. The entire fan-out is a chain of nested function calls on one stack.
Inside the one process, asyncio runs its event loop on the main thread, and each RobotNode spawns a handful of worker threads. The threads that matter for teleop are just these:
| Thread | Role in teleop |
|---|---|
publish_worker_A |
Reads leader joint positions at the leader's publish rate (e.g., 100 Hz) and publishes state.leader to the bus. |
publish_worker_B |
Reads follower joint positions and publishes state.follower. Feeds the orchestrator / recorder's view of the follower, not the command path. |
ai_inference_worker |
Dedicated thread that produces commands when the control mode is AI instead of teleop. Idle during teleop. |
| main / asyncio loop | Serves the UI websocket and broadcasts. Not on the hot path. |
Notice what is not in the table: no "bus thread", no "orchestrator thread", no "subscriber thread". Those components exist as code, not as threads. They run inline on whichever publisher woke them up.
Originally, the follower's bus subscriber did not apply commands directly. It parsed and queued them, and a separate rate-paced worker (command_executer) popped the queue on a fixed tick and wrote to hardware. The architecture looked like this:
update_state("leader", ...)._on_command_received deserializes, drops the oldest if full, then queue.put_nowait(state).command_executer's tick fires (every 10 ms), pops the oldest item.Steps 1–8 are consecutive function calls on a single Python stack — no scheduling, no cross-thread handoff. A representative trace from the profiler looked like this:
publish.leader.got_state → ssm.leader.deserialized 0.018 ms
ssm.leader.deserialized → sysstate.leader.pre_callback 0.011 ms
sysstate.leader.pre_callback → orch.cb_enter 0.001 ms
orch.cb_enter → orch.cb_converted 0.035 ms
orch.cb_converted → orch.cb_pre_serialize 0.012 ms
orch.cb_pre_serialize → follower.cmd_parsed 0.014 ms
follower.cmd_parsed → follower.cmd_queued 0.002 ms
follower.cmd_queued → follower.cmd_dequeued 40.751 ms <-- the one slow hop
follower.cmd_dequeued → follower.cmd_applied 0.214 ms
Every hop except one was measured in microseconds. A single hop — cmd_queued → cmd_dequeued — dominated end-to-end latency. That hop was the only real thread boundary in the pipeline.
The queue was a FIFO with maxsize=5, and the consumer popped from the oldest end. Producer and consumer both ticked at 100 Hz. Any small scheduling hiccup let the producer get one slot ahead, and nothing in the protocol recovered — there was no drop-to-newest or drain semantics. Over time the queue settled at roughly 4 in-flight items.
Popping the oldest of 4 items, each enqueued ~10 ms apart, hands the consumer a command that is already ~40 ms stale. The "drop-oldest on full" logic only fired when the queue hit 5, which kept it bounded but did not help freshness — the consumer was always reading the oldest command, not the newest.
Obvious shapes of fix exist — a single-slot mailbox, a "drain to newest" consumer, a ring buffer, explicit priority — but each still adds a thread boundary. The better question is whether the queue needs to be there at all.
The queue existed to smooth producer rate against a paced consumer. But since the producer (teleop leader, or AI inference) already runs at exactly the rate at which commands should be applied, the pacing served no purpose — the queue only added a thread boundary and a steady-state stall.
The change: the follower's _on_command_received now writes directly to hardware on the bus-delivered call.
# before def _on_command_received(self, payload): state = command_parser(payload) if self.command_queue.full(): self.command_queue.get_nowait() # drop oldest self.command_queue.put_nowait(state) # after def _on_command_received(self, payload): state = RobotState.from_bytes(payload) self.robot.set_state(state) # apply directly on the subscriber thread
command_queue, the command_parser indirection, and the rate-paced command_executer thread. What stayed: the producers publish at their native rate (leader tick for teleop, inference tick for AI), and the follower applies whatever arrives.Two properties are worth calling out explicitly, because they look scary until you remember how the in-process bus works:
publish_worker_A calls bus.publish("action.follower", ...), the follower subscriber runs inline on that same thread and does the blocking hardware write. The leader's publish tick now pays the follower's write cost. That's fine — the write is sub-millisecond, and the leader was already blocked waiting for the subscriber chain to return anyway.Every step runs on publish_worker_A, the leader's publish worker. There is no queue, no second thread, no rate-paced consumer. The total wall-clock time from "read leader joint" to "hardware command accepted by follower" is on the order of ~1 ms, bounded mostly by the hardware writes on each end.
For AI-driven control, the path is structurally identical: the ai_inference_worker publishes action.follower at its native predict rate, and the follower subscriber applies each command inline on the inference thread. No queue, no executer tick, no rate mismatch.
command_executer dequeues every 10 mscommand_parser, command_executer, queue plumbing