What started as "a PID twin in a subprocess" turned into a transport-bus seam underneath every cross-process Robot in atlas. The bus, four implementations, the digital-twin lifecycle, the ROS rewrite, the Quest VR migration, and a robot-package reorg — all from one branch.
A single PubSub ABC under servo7/bus/bus.py with four implementations — InProcessBus, ZmqBus, MpBus, UdpBus — and the same RobotServer + RemoteRobotClient pair rides on any of them. RemoteRobotClient grew five over_* classmethods that own the full spawn-and-connect lifecycle for each transport. PID digital twin is now RemoteRobotClient.over_zmq_local_server("pid", …) in one line. ROS robots moved their synchronous control onto MpBus. Quest VR now sits on UdpBus. The servo7/robot/ package was sorted by kind: hardware/, mock/, remote/.
One ABC, four methods. The lowest-common-denominator interface every transport implements:
# servo7/bus/bus.py
class PubSub(ABC):
@abstractmethod
def publish(self, topic: str, payload: bytes) -> None: ...
@abstractmethod
def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> None: ...
@abstractmethod
def get_latest(self, topic: str) -> bytes | None: ...
@abstractmethod
def close(self) -> None: ...
That's it. Bytes in, bytes out, fire-and-forget, latest-wins cache, explicit lifecycle. InProcessBus doesn't actually need bytes (it stores references), but the ABC forces the contract because the other three transports do need them on the wire.
| Bus | Wire | Use when | Cost |
|---|---|---|---|
| InProcessBus | Python callbacks, no I/O | Server and client live in the same Python process; tests; quick wiring | ~function-call |
| ZmqBus | TCP, ZMQ pub/sub | Cross-host or cross-process; production transport for hardware servers | ~50–100 µs loopback |
| MpBus | mp.Queue pair, pickle | Parent ↔ child mp.Process; ROS isolation; carries pub/sub and synchronous req/serve on one wire | ~10–30 µs same-host |
| UdpBus | UDP datagrams | External producers (Quest VR controllers, foot pedal); lossy, latest-wins streams | ~100 µs loopback, lossy |
MpBus is the odd one out — it implements PubSub (for streaming) and adds request()/serve() methods for synchronous request/response on the same queue pair. Three frame tags share the wire: ("pub", topic, payload), ("req", correlation_id, type, payload), ("res", correlation_id, payload, error). The receiver thread dispatches by tag. request() blocks on a per-correlation-id threading.Event; concurrent requests don't get their replies mixed up.
┌────────────────────┐
│ PubSub (ABC) │
└─────────┬──────────┘
┌─────────────┼─────────────┬──────────────────┐
InProcessBus ZmqBus MpBus UdpBus
(callbacks) (TCP) (mp.Queue) (datagrams)
+request/serve
Both classes take a PubSub at construction time. No transport baked in. The two roles still exist — server publishes state at 100 Hz and subscribes to commands; client subscribes to state and publishes commands — but the wire is just whatever bus you handed them.
class RobotServer:
def __init__(self, robot: Robot, bus: PubSub, _owns_bus: bool = False):
self.robot = robot
self.bus = bus
self.bus.subscribe(TOPIC_COMMAND, self._on_command_bytes)
class RemoteRobotClient(Robot):
def __init__(self, config, id, bus: PubSub, _owns_bus: bool = False):
super().__init__(config, id)
self.bus = bus
self.bus.subscribe(TOPIC_STATE, self._on_state_bytes)
To save callers from wiring up the bus manually, RemoteRobotClient exposes a classmethod per common scenario. Each one owns the subprocess/server lifecycle — disconnect() tears down whatever the constructor brought up.
| Classmethod | What happens on construction | What disconnect() does |
|---|---|---|
over_zmq(host, ports) | Builds a ZmqBus pointing at an already-running server somewhere on the network | Closes the bus |
over_zmq_local_server(robot_type, ports) | Spawns python -m servo7.robot.remote.robot_server --robot_type X …, waits for READY <nonce>, connects via ZmqBus | Closes the bus + terminates the subprocess |
over_in_process(robot) | Builds an InProcessBus, constructs a RobotServer around the given Robot, starts it | Stops the server, closes the bus |
over_mp_subprocess(robot_type) | Spawns an mp.Process running a RobotServer over MpBus; child constructs the Robot via RobotFactory | Closes the bus, terminates the child, joins it |
over_zmq (server side) | Same idea on RobotServer: build a ZmqBus bound to the given ports | — |
The original problem: run a PID-based digital twin in a separate process to keep numpy / future MuJoCo out of atlas's interpreter. The collapse: a digital twin is just another Robot served by the same RobotServer we already have.
servo7/digital_twin/pid_6dof.py::DigitalTwinPID(Robot). P/D integrator with its own 100 Hz step thread. RobotType.PID in the enum; RobotFactory constructs it; config/robots/pid.yaml describes it. Runs inside the subprocess.
From atlas main: RemoteRobotClient.over_zmq_local_server(config, id, robot_type="pid", …). Spawns the subprocess holding the kernel, waits for READY, connects, returns a fully-wired Robot. One call.
# atlas main process
client = RemoteRobotClient.over_zmq_local_server(
config=pid_config, id="twin", robot_type="pid",
state_port=5557, command_port=5558,
)
client.connect() # waits for first state on the wire
client.set_state(target_action) # P/D integrator converges
state = client.get_state() # cached, no wire round-trip
client.disconnect() # also terminates the subprocess
The Robot base class grew a digital_twin: Robot | None slot plus opt-in helpers (_route_set_state, _route_get_state) that subclasses call at the top of their own set_state / get_state. The hardware Robot's enable_digital_twin(on) flag is what flips the routing.
class DummyRobot(Robot):
def set_state(self, action):
super().set_state(action)
routed = self._route_set_state(action)
if routed is not None:
return routed # → hits the twin
# ... hardware path
The set-state path uses dataclasses.replace to retag the action with the twin's robot_type so the twin's own type assertion passes. Subclasses opt in by adding two lines; nothing forces every Robot to migrate.
The existing ROS pair (ROSRobotProxy in atlas main, a hand-rolled tuple protocol over two mp.Queues to a child process) was renamed and restructured:
ros_robot_proxy.py → ros_robot_client.py; class ROSRobotProxy → ROSRobotClientros_robot_process.py → ros_robot_server.py; the run_robot_process function became a ROSRobotServer class with handler methodsmp.Process pickling, but is now in ros_robot_client.py next to the client (no separate ros_robot_launcher.py)Then the wire migrated: synchronous control (connect, disconnect, emergency_stop) moved onto MpBus.request/serve — correlation IDs, typed errors (MpRemoteError), no more "drain the queue until you see a connect_response" loops. Streaming (state updates, observation metadata) stays on the existing raw mp.Queue pair; moving it onto a PubSub channel is a clean follow-up.
# Server side (inside child mp.Process):
self.req_res.serve("connect", self._serve_connect)
self.req_res.serve("disconnect", self._serve_disconnect)
self.req_res.serve("emergency_stop", self._serve_emergency_stop)
# Client side (atlas main):
response_bytes = self.req_res.request("connect", b"", timeout=30.0)
response = json.loads(response_bytes)
if not response["success"]:
...
The 95-line UDPReceiver class inside quest_vr.py is gone. In its place: a UdpBus.bind(...) call and a subscriber callback. The "what does this datagram mean" logic became a free function so it's testable in isolation.
# In QuestVRRobot.connect():
self._bus = UdpBus.bind(
endpoints={"quest.pose": ("sub", self.udp_port)},
host=self.udp_host,
)
self._bus.subscribe("quest.pose", self._on_pose_datagram)
# Module-level free function:
def _parse_quest_pose_datagram(data: bytes) -> DualControllerData | None:
# decode CSV, build ControllerPose for left+right, return DualControllerData
Nothing on the Quest side changed. The wire (UDP packets, POSE,<26 floats>\n at the same port) is unchanged. The Meta Quest app doesn't know — and doesn't need to know — that we restructured our receive side.
The servo7/robot/ folder had grown into a flat list of unrelated concerns (drivers, mocks, transport wrappers, sims). Sorted by what kind of robot:
servo7/robot/
├── factory.py the lookup
├── robot_types.py the data shapes (RobotState, RobotType, ...)
├── __init__.py re-exports Robot, RobotState, ...
│
├── hardware/ concrete hardware drivers + ABC
│ ├── base.py (Robot ABC)
│ ├── base_ros.py
│ ├── piper.py / dual_piper.py
│ ├── so100.py / so100_follower.py / dual_so100.py
│ ├── r1.py / r1t.py
│ └── quest_vr.py
│
├── mock/ fake robots — for tests, leaders, signal gen
│ ├── mock_6dof.py (was dummy.py)
│ ├── mock_r1.py (was mock_ros_robot.py)
│ └── mock_6dof_signal.py (was signal_robot.py)
│
└── remote/ transport-wrapped Robots + servers
├── robot_server.py (RobotServer — bus-agnostic)
├── robot_client.py (RemoteRobotClient + 5 over_* classmethods)
├── ros_robot_client.py (ROSRobotClient + launch_ros_robot)
└── ros_robot_server.py (ROSRobotServer)
servo7/digital_twin/ sims / kernels — twins of real robots
├── pid_6dof.py (DigitalTwinPID)
├── r1_mujoco.py (R1LiteSimRobot — was r1lite_sim.py)
└── mujoco/r1/ MuJoCo env files (was servo7/sim/)
From sim-design-discussion: the Fairino digital twin is a Docker container running their FRControl image, speaking Fairino's TCP protocol natively. In the new architecture this is straightforward:
┌────────────────────────┐
│ hardware Fairino arm │
└────────────┬───────────┘
│
FairinoRobot(host="192.168.x.x", port=20003)
│
set_digital_twin(twin)
│
┌─────────┴─────────┐
│ │
▼ ▼
FairinoRobot (any other Robot impl —
(host="localhost", e.g. RemoteRobotClient.over_zmq_local_server
port=20004) for a PID twin you're sanity-checking against)
│
▼ (Fairino TCP — their SDK)
docker run fairino/sim:tag
The same wire format as the real arm — different host/port. No RobotServer + RemoteRobotClient needed in front of the docker because Fairino's SDK is already a remote service. A future FairinoRobot(Robot) subclass that speaks their TCP, plus a launch_fairino_docker(port) helper, is all the Fairino path needs.
Each bus has its own contract tests (subclass-of-PubSub, pub/sub round-trip, error surfaces, lifecycle). The server/client pair has integration tests over each transport. The digital twin chain has both in-process and subprocess end-to-end tests.
| Test file | # tests | What it pins |
|---|---|---|
tests/test_in_process_bus.py | 12 | InProcessBus contract (pre-existing) |
tests/test_zmq_bus.py | 14 | ZmqBus: bind/connect, pub/sub, errors, lifecycle |
tests/test_mp_bus.py | 12 | MpBus: pub/sub + req/serve; cross-process round-trip via real mp.Process |
tests/test_udp_bus.py | 9 | UdpBus: datagram receive, get_latest, error surfaces |
tests/test_robot_server.py | 8 | RobotServer + RemoteRobotClient over ZMQ |
tests/test_robot_server_inproc.py | 4 | Same pair over InProcessBus |
tests/test_remote_robot_client_constructors.py | 2 | over_in_process + over_mp_subprocess |
tests/test_digital_twin_pid.py | 10 | PID kernel: shape, lifecycle, convergence, safety |
tests/test_digital_twin_e2e.py | 2 | over_zmq_local_server: spawn → connect → converge → shutdown |
tests/test_robot_digital_twin_routing.py | 7 | Robot.digital_twin slot, flag toggle, action retagging |
Net: uv run pytest -m "not skip_ci and not skip_long and not hardware" → 439 passing, 48 skipped (the usual integration/credential gates), 71 deselected. Baseline before this branch was 381.
State/observation updates still ride the legacy raw mp.Queue tuple protocol. The natural follow-up: PubSub topics on the same MpBus that already carries the control req/res. Then ros_robot_client + ros_robot_server stop having any direct queue handling at all.
servo7/launcher/ still owns the ssh + devcontainer remote-spawn path for the frontend's "pick a host" flow. RemoteRobotClient.over_remote_subprocess(host, robot_type) would give scripted/headless callers the same convenience that over_zmq_local_server has today — and let the LauncherService delegate the actual spawning instead of duplicating it.
The PubSub ABC types payload as bytes because the cross-process buses need it. In-process callers serialize for the contract even though the bus itself just passes references. A Channel[T] generic — or just relaxing the type on InProcessBus — would let hot paths skip the JSON round-trip entirely.
Files renamed (mock_6dof.py, pid_6dof.py, …) but class names didn't (still DummyRobot, DigitalTwinPID, R1LiteSimRobot). One sweeping rename pass when the team's ready; everything still works either way.