๐Ÿงญ

Setup

One process, two robots, one teleop loop
architecture teleoperation

In local mode, the whole system — Robot A (leader), Robot B (follower), orchestrator, system-state tracker, in-process bus — lives inside a single Python process. Both robots plug into the same machine; there is no network between them.

Each robot is represented by a RobotNode that spawns a small family of worker threads via asyncio.to_thread. The orchestrator registers callbacks on the bus so that state updates from one robot turn into commands for the other. Hardware is generic — the leader might talk over USB-serial, the follower over CAN, or vice versa — but none of that matters for the control flow.

What we want to answer on this page: when the operator moves Robot A, what exactly happens, on which threads, and how long does each step take before Robot B moves?

Local mode is the simplest possible deployment — everything shares one process and one memory space. That simplicity is what makes the following observation possible at all: the entire teleop pipeline runs on one thread, not the three or four you'd expect from reading the architecture diagram.
๐Ÿ“ฌ

The InProcessBus Is a Function Call

Not a thread, not a queue — a for loop over subscribers
concurrency mental model

The single most important piece of the mental model is that the in-process bus has no worker of its own. Its publish() is about ten lines:

class InProcessBus:
    def publish(self, topic: str, payload: bytes) -> None:
        with self._lock:
            self._latest[topic] = payload
            subscribers = list(self._subscribers.get(topic, ()))

        for callback in subscribers:
            try:
                callback(payload)            # synchronous — runs on the publisher's thread
            except Exception as e:
                self.logger.error(...)

There is no message queue, no event loop, no dispatcher. publish() grabs the subscriber list, then calls each callback in a Python for loop. Because Python calls functions synchronously, every subscriber runs on whatever thread called publish().

The practical consequence: when the leader's publisher calls bus.publish("state.leader", ...), the system-state manager's subscriber runs on the leader's publisher thread. That subscriber fires the orchestrator's registered state-change callback — still on the leader's publisher thread. That callback calls bus.publish("action.follower", ...), which invokes the follower's subscriber — still on the leader's publisher thread. The entire fan-out is a chain of nested function calls on one stack.

If you have ever drawn an architecture diagram with boxes labelled "leader publisher", "bus", "orchestrator subscriber", "follower subscriber" and assumed each is a thread, you are wrong by three threads. In local mode, bus subscribers are not threads — they are callbacks invoked on the publisher's thread.
๐Ÿงต

The Threads Involved

What actually runs on CPU cores
concurrency

Inside the one process, asyncio runs its event loop on the main thread, and each RobotNode spawns a handful of worker threads. The threads that matter for teleop are just these:

Thread Role in teleop
publish_worker_A Reads leader joint positions at the leader's publish rate (e.g., 100 Hz) and publishes state.leader to the bus.
publish_worker_B Reads follower joint positions and publishes state.follower. Feeds the orchestrator / recorder's view of the follower, not the command path.
ai_inference_worker Dedicated thread that produces commands when the control mode is AI instead of teleop. Idle during teleop.
main / asyncio loop Serves the UI websocket and broadcasts. Not on the hot path.

Notice what is not in the table: no "bus thread", no "orchestrator thread", no "subscriber thread". Those components exist as code, not as threads. They run inline on whichever publisher woke them up.

๐Ÿ•ฐ๏ธ

The Old Path โ€” With a Command Queue

How it used to work, and where the 40 ms went
before latency

Originally, the follower's bus subscriber did not apply commands directly. It parsed and queued them, and a separate rate-paced worker (command_executer) popped the queue on a fixed tick and wrote to hardware. The architecture looked like this:

#
publish_worker_A (leader)
QUEUE
command_executer (follower)
1
leader.robot.get_state()
Read joints from leader hardware.
~0.3–1 ms
2
bus.publish("state.leader", raw)
Iterates subscribers synchronously. SystemStateManager is one of them.
3
ssm.leader.deserialized
SystemStateManager's subscriber deserializes and calls update_state("leader", ...).
4
orch.cb_enter
Orchestrator's state-change callback runs. Early-returns unless in teleop mode.
5
orch.cb_converted
Joint-space converter maps the leader's pose onto the follower's (reindex, scale, offset).
6
orch.cb_pre_serialize
EMA smoother applied. Payload serialized to bytes.
7
bus.publish("action.follower", raw)
Second bus publish. Still on the leader's thread.
8
follower.cmd_parsed follower.cmd_queued
Follower's _on_command_received deserializes, drops the oldest if full, then queue.put_nowait(state).
9
leader thread unwinds and loops »
command sits in FIFO (maxsize = 5)
10
follower.cmd_dequeued
command_executer's tick fires (every 10 ms), pops the oldest item.
~40 ms observed p50
11
follower.robot.set_state(state)
Blocking hardware write.
~0.2–1 ms
12
follower.cmd_applied
End-to-end trace record terminates here.
leader publisher thread thread boundary (queue) follower executer thread

Steps 1–8 are consecutive function calls on a single Python stack — no scheduling, no cross-thread handoff. A representative trace from the profiler looked like this:

publish.leader.got_state       → ssm.leader.deserialized        0.018 ms
ssm.leader.deserialized        → sysstate.leader.pre_callback   0.011 ms
sysstate.leader.pre_callback   → orch.cb_enter                  0.001 ms
orch.cb_enter                  → orch.cb_converted              0.035 ms
orch.cb_converted              → orch.cb_pre_serialize          0.012 ms
orch.cb_pre_serialize          → follower.cmd_parsed            0.014 ms
follower.cmd_parsed            → follower.cmd_queued            0.002 ms
follower.cmd_queued            → follower.cmd_dequeued         40.751 ms   <-- the one slow hop
follower.cmd_dequeued          → follower.cmd_applied           0.214 ms

Every hop except one was measured in microseconds. A single hop — cmd_queuedcmd_dequeued — dominated end-to-end latency. That hop was the only real thread boundary in the pipeline.

๐Ÿงฎ

Why 40 ms at Steady State

Equal-rate producer + FIFO consumer = stale by a queue-length
backpressure queueing

The queue was a FIFO with maxsize=5, and the consumer popped from the oldest end. Producer and consumer both ticked at 100 Hz. Any small scheduling hiccup let the producer get one slot ahead, and nothing in the protocol recovered — there was no drop-to-newest or drain semantics. Over time the queue settled at roughly 4 in-flight items.

Popping the oldest of 4 items, each enqueued ~10 ms apart, hands the consumer a command that is already ~40 ms stale. The "drop-oldest on full" logic only fired when the queue hit 5, which kept it bounded but did not help freshness — the consumer was always reading the oldest command, not the newest.

A FIFO between two equal-rate producers and consumers doesn't just add latency equal to one tick — it adds latency proportional to however deep the queue runs in steady state. With a 10 ms tick and a queue depth of 4, that's 40 ms of stale-command delivery, every cycle, forever.

Obvious shapes of fix exist — a single-slot mailbox, a "drain to newest" consumer, a ring buffer, explicit priority — but each still adds a thread boundary. The better question is whether the queue needs to be there at all.

โœ‚๏ธ

The Fix โ€” Drop the Queue

Apply the command inline on the subscriber thread
after simplification

The queue existed to smooth producer rate against a paced consumer. But since the producer (teleop leader, or AI inference) already runs at exactly the rate at which commands should be applied, the pacing served no purpose — the queue only added a thread boundary and a steady-state stall.

The change: the follower's _on_command_received now writes directly to hardware on the bus-delivered call.

# before
def _on_command_received(self, payload):
    state = command_parser(payload)
    if self.command_queue.full():
        self.command_queue.get_nowait()   # drop oldest
    self.command_queue.put_nowait(state)

# after
def _on_command_received(self, payload):
    state = RobotState.from_bytes(payload)
    self.robot.set_state(state)           # apply directly on the subscriber thread
What came out: the command_queue, the command_parser indirection, and the rate-paced command_executer thread. What stayed: the producers publish at their native rate (leader tick for teleop, inference tick for AI), and the follower applies whatever arrives.

Two properties are worth calling out explicitly, because they look scary until you remember how the in-process bus works:

  1. The hardware write happens on the publisher's thread. When the leader's publish_worker_A calls bus.publish("action.follower", ...), the follower subscriber runs inline on that same thread and does the blocking hardware write. The leader's publish tick now pays the follower's write cost. That's fine — the write is sub-millisecond, and the leader was already blocked waiting for the subscriber chain to return anyway.
  2. There is no built-in rate limit. If the producer ever produces faster than hardware can consume, hardware writes back up. In practice the producer ticks (teleop leader, AI inference) are slower than the hardware bus, so this is self-limiting. If that ever changes, rate limiting belongs at the producer, not as a separate queue-and-tick on the consumer.
๐Ÿš€

The New Path

One thread, one Python stack, end to end
after
#
publish_worker_A (leader publisher)
1
leader.robot.get_state()
Read joints from leader hardware.
2
bus.publish("state.leader", raw)
Iterate subscribers. SystemStateManager callback runs inline.
3
update_state("leader", ...)
SystemState stores the value, fires the orchestrator's state-change callback.
4
orchestrator._on_leader_state_change(...)
Convert leader pose → follower pose. Apply EMA smoother. Serialize.
5
bus.publish("action.follower", raw)
Second publish. Follower's subscriber runs inline.
6
follower.robot.set_state(state)
Hardware write. Returns. Stack unwinds all the way back to step 1's caller.
~0.2–1 ms

Every step runs on publish_worker_A, the leader's publish worker. There is no queue, no second thread, no rate-paced consumer. The total wall-clock time from "read leader joint" to "hardware command accepted by follower" is on the order of ~1 ms, bounded mostly by the hardware writes on each end.

For AI-driven control, the path is structurally identical: the ai_inference_worker publishes action.follower at its native predict rate, and the follower subscriber applies each command inline on the inference thread. No queue, no executer tick, no rate mismatch.

Before

  • Two threads + FIFO queue between them
  • Subscriber parses & enqueues
  • Rate-paced command_executer dequeues every 10 ms
  • Steady-state queue depth ~4 ⇒ commands delivered ~40 ms stale
  • Extra code: command_parser, command_executer, queue plumbing

After

  • One thread, one stack, end to end
  • Subscriber applies command directly to hardware
  • Producers publish at their native rate (leader tick, AI predict)
  • End-to-end latency ~1 ms, bounded by hardware writes
  • Deleted: queue, parser, executer thread
The original design treated the bus as a message-passing system that needed buffers and rate-matching on each end. But the in-process bus is just a function call — once you see that, the queue is obviously a thread boundary pretending to solve a problem it actually creates. The best fix for a boundary that costs 40 ms and provides nothing is to not have the boundary.
๐ŸŽฏ

Takeaways

Portable lessons for any publisher/subscriber architecture
principles
  • Know what your bus actually does. An in-process bus that calls subscribers inline is a very different performance object from a networked bus or a queue-backed bus. Draw the call graph, not just the topology — you'll discover most "nodes" aren't threads at all.
  • A FIFO between equal-rate producer and consumer adds latency, not elasticity. It settles at some non-zero depth, and the consumer drinks from the oldest end. If both sides run at 100 Hz, you get “queue depth × tick period” of free staleness, every cycle.
  • Rate-pacing the consumer when the producer already paces itself is a bug. The only legitimate job of a consumer-side queue is to absorb producer bursts or consumer stalls. If neither is happening, the queue is dead weight.
  • The cheapest thread is the one you didn't spawn. Every thread boundary is a place where latency, bugs, and ordering questions live. Inline the work when you can; boundary it when you must.
Remote mode (cross-machine leader/orchestrator/follower) reintroduces real thread boundaries at the network layer — a networked bus has its own receive thread and its own serialize/deserialize cost. But the same principle holds inside each process: subscribers run on the receiver thread. Boundaries should exist where the physics require them, not where the code happens to have grown them.