Atlas · Servo7

Atlas Bus & Digital Twin — implementation report

What started as "a PID twin in a subprocess" turned into a transport-bus seam underneath every cross-process Robot in atlas. The bus, four implementations, the digital-twin lifecycle, the ROS rewrite, the Quest VR migration, and a robot-package reorg — all from one branch.

Draft · 2026-05-14 · Companion to sim-design-discussion · Tests: 381 → 439 passing (+58)

0. TL;DR

A single PubSub ABC under servo7/bus/bus.py with four implementations — InProcessBus, ZmqBus, MpBus, UdpBus — and the same RobotServer + RemoteRobotClient pair rides on any of them. RemoteRobotClient grew five over_* classmethods that own the full spawn-and-connect lifecycle for each transport. PID digital twin is now RemoteRobotClient.over_zmq_local_server("pid", …) in one line. ROS robots moved their synchronous control onto MpBus. Quest VR now sits on UdpBus. The servo7/robot/ package was sorted by kind: hardware/, mock/, remote/.

1. The bus contract

One ABC, four methods. The lowest-common-denominator interface every transport implements:

# servo7/bus/bus.py

class PubSub(ABC):
    @abstractmethod
    def publish(self, topic: str, payload: bytes) -> None: ...
    @abstractmethod
    def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> None: ...
    @abstractmethod
    def get_latest(self, topic: str) -> bytes | None: ...
    @abstractmethod
    def close(self) -> None: ...

That's it. Bytes in, bytes out, fire-and-forget, latest-wins cache, explicit lifecycle. InProcessBus doesn't actually need bytes (it stores references), but the ABC forces the contract because the other three transports do need them on the wire.

2. The four implementations

BusWireUse whenCost
InProcessBusPython callbacks, no I/OServer and client live in the same Python process; tests; quick wiring~function-call
ZmqBusTCP, ZMQ pub/subCross-host or cross-process; production transport for hardware servers~50–100 µs loopback
MpBusmp.Queue pair, pickleParent ↔ child mp.Process; ROS isolation; carries pub/sub and synchronous req/serve on one wire~10–30 µs same-host
UdpBusUDP datagramsExternal producers (Quest VR controllers, foot pedal); lossy, latest-wins streams~100 µs loopback, lossy

MpBus is the odd one out — it implements PubSub (for streaming) and adds request()/serve() methods for synchronous request/response on the same queue pair. Three frame tags share the wire: ("pub", topic, payload), ("req", correlation_id, type, payload), ("res", correlation_id, payload, error). The receiver thread dispatches by tag. request() blocks on a per-correlation-id threading.Event; concurrent requests don't get their replies mixed up.

                ┌────────────────────┐
                │   PubSub  (ABC)    │
                └─────────┬──────────┘
            ┌─────────────┼─────────────┬──────────────────┐
       InProcessBus   ZmqBus         MpBus              UdpBus
       (callbacks)    (TCP)        (mp.Queue)         (datagrams)
                                  +request/serve

3. RobotServer + RemoteRobotClient — bus-agnostic

Both classes take a PubSub at construction time. No transport baked in. The two roles still exist — server publishes state at 100 Hz and subscribes to commands; client subscribes to state and publishes commands — but the wire is just whatever bus you handed them.

class RobotServer:
    def __init__(self, robot: Robot, bus: PubSub, _owns_bus: bool = False):
        self.robot = robot
        self.bus = bus
        self.bus.subscribe(TOPIC_COMMAND, self._on_command_bytes)

class RemoteRobotClient(Robot):
    def __init__(self, config, id, bus: PubSub, _owns_bus: bool = False):
        super().__init__(config, id)
        self.bus = bus
        self.bus.subscribe(TOPIC_STATE, self._on_state_bytes)

The five over_* classmethods

To save callers from wiring up the bus manually, RemoteRobotClient exposes a classmethod per common scenario. Each one owns the subprocess/server lifecycle — disconnect() tears down whatever the constructor brought up.

ClassmethodWhat happens on constructionWhat disconnect() does
over_zmq(host, ports)Builds a ZmqBus pointing at an already-running server somewhere on the networkCloses the bus
over_zmq_local_server(robot_type, ports)Spawns python -m servo7.robot.remote.robot_server --robot_type X …, waits for READY <nonce>, connects via ZmqBusCloses the bus + terminates the subprocess
over_in_process(robot)Builds an InProcessBus, constructs a RobotServer around the given Robot, starts itStops the server, closes the bus
over_mp_subprocess(robot_type)Spawns an mp.Process running a RobotServer over MpBus; child constructs the Robot via RobotFactoryCloses the bus, terminates the child, joins it
over_zmq (server side)Same idea on RobotServer: build a ZmqBus bound to the given ports

4. Digital twin — a Robot you spawn-and-connect in one call

The original problem: run a PID-based digital twin in a separate process to keep numpy / future MuJoCo out of atlas's interpreter. The collapse: a digital twin is just another Robot served by the same RobotServer we already have.

The kernel

servo7/digital_twin/pid_6dof.py::DigitalTwinPID(Robot). P/D integrator with its own 100 Hz step thread. RobotType.PID in the enum; RobotFactory constructs it; config/robots/pid.yaml describes it. Runs inside the subprocess.

The handle

From atlas main: RemoteRobotClient.over_zmq_local_server(config, id, robot_type="pid", …). Spawns the subprocess holding the kernel, waits for READY, connects, returns a fully-wired Robot. One call.

# atlas main process
client = RemoteRobotClient.over_zmq_local_server(
    config=pid_config, id="twin", robot_type="pid",
    state_port=5557, command_port=5558,
)
client.connect()                       # waits for first state on the wire
client.set_state(target_action)        # P/D integrator converges
state = client.get_state()             # cached, no wire round-trip
client.disconnect()                    # also terminates the subprocess

Routing slot on the hardware robot

The Robot base class grew a digital_twin: Robot | None slot plus opt-in helpers (_route_set_state, _route_get_state) that subclasses call at the top of their own set_state / get_state. The hardware Robot's enable_digital_twin(on) flag is what flips the routing.

class DummyRobot(Robot):
    def set_state(self, action):
        super().set_state(action)
        routed = self._route_set_state(action)
        if routed is not None:
            return routed                 # → hits the twin
        # ... hardware path

The set-state path uses dataclasses.replace to retag the action with the twin's robot_type so the twin's own type assertion passes. Subclasses opt in by adding two lines; nothing forces every Robot to migrate.

5. ROS migration — pub/sub + req/serve on one wire

The existing ROS pair (ROSRobotProxy in atlas main, a hand-rolled tuple protocol over two mp.Queues to a child process) was renamed and restructured:

Then the wire migrated: synchronous control (connect, disconnect, emergency_stop) moved onto MpBus.request/serve — correlation IDs, typed errors (MpRemoteError), no more "drain the queue until you see a connect_response" loops. Streaming (state updates, observation metadata) stays on the existing raw mp.Queue pair; moving it onto a PubSub channel is a clean follow-up.

# Server side (inside child mp.Process):
self.req_res.serve("connect", self._serve_connect)
self.req_res.serve("disconnect", self._serve_disconnect)
self.req_res.serve("emergency_stop", self._serve_emergency_stop)

# Client side (atlas main):
response_bytes = self.req_res.request("connect", b"", timeout=30.0)
response = json.loads(response_bytes)
if not response["success"]:
    ...

6. Quest VR — UdpBus, parsing as a free function

The 95-line UDPReceiver class inside quest_vr.py is gone. In its place: a UdpBus.bind(...) call and a subscriber callback. The "what does this datagram mean" logic became a free function so it's testable in isolation.

# In QuestVRRobot.connect():
self._bus = UdpBus.bind(
    endpoints={"quest.pose": ("sub", self.udp_port)},
    host=self.udp_host,
)
self._bus.subscribe("quest.pose", self._on_pose_datagram)

# Module-level free function:
def _parse_quest_pose_datagram(data: bytes) -> DualControllerData | None:
    # decode CSV, build ControllerPose for left+right, return DualControllerData

Nothing on the Quest side changed. The wire (UDP packets, POSE,<26 floats>\n at the same port) is unchanged. The Meta Quest app doesn't know — and doesn't need to know — that we restructured our receive side.

7. Robot package layout

The servo7/robot/ folder had grown into a flat list of unrelated concerns (drivers, mocks, transport wrappers, sims). Sorted by what kind of robot:

servo7/robot/
├── factory.py              the lookup
├── robot_types.py          the data shapes (RobotState, RobotType, ...)
├── __init__.py             re-exports Robot, RobotState, ...
│
├── hardware/               concrete hardware drivers + ABC
│   ├── base.py             (Robot ABC)
│   ├── base_ros.py
│   ├── piper.py / dual_piper.py
│   ├── so100.py / so100_follower.py / dual_so100.py
│   ├── r1.py / r1t.py
│   └── quest_vr.py
│
├── mock/                   fake robots — for tests, leaders, signal gen
│   ├── mock_6dof.py        (was dummy.py)
│   ├── mock_r1.py          (was mock_ros_robot.py)
│   └── mock_6dof_signal.py (was signal_robot.py)
│
└── remote/                 transport-wrapped Robots + servers
    ├── robot_server.py     (RobotServer — bus-agnostic)
    ├── robot_client.py     (RemoteRobotClient + 5 over_* classmethods)
    ├── ros_robot_client.py (ROSRobotClient + launch_ros_robot)
    └── ros_robot_server.py (ROSRobotServer)

servo7/digital_twin/        sims / kernels — twins of real robots
├── pid_6dof.py             (DigitalTwinPID)
├── r1_mujoco.py            (R1LiteSimRobot — was r1lite_sim.py)
└── mujoco/r1/              MuJoCo env files (was servo7/sim/)

8. Where Fairino fits

From sim-design-discussion: the Fairino digital twin is a Docker container running their FRControl image, speaking Fairino's TCP protocol natively. In the new architecture this is straightforward:

                    ┌────────────────────────┐
                    │  hardware Fairino arm  │
                    └────────────┬───────────┘
                                 │
                  FairinoRobot(host="192.168.x.x", port=20003)
                                 │
                       set_digital_twin(twin)
                                 │
                       ┌─────────┴─────────┐
                       │                   │
                       ▼                   ▼
            FairinoRobot               (any other Robot impl —
            (host="localhost",          e.g. RemoteRobotClient.over_zmq_local_server
             port=20004)                 for a PID twin you're sanity-checking against)
                       │
                       ▼  (Fairino TCP — their SDK)
              docker run fairino/sim:tag

The same wire format as the real arm — different host/port. No RobotServer + RemoteRobotClient needed in front of the docker because Fairino's SDK is already a remote service. A future FairinoRobot(Robot) subclass that speaks their TCP, plus a launch_fairino_docker(port) helper, is all the Fairino path needs.

9. Test coverage

Each bus has its own contract tests (subclass-of-PubSub, pub/sub round-trip, error surfaces, lifecycle). The server/client pair has integration tests over each transport. The digital twin chain has both in-process and subprocess end-to-end tests.

Test file# testsWhat it pins
tests/test_in_process_bus.py12InProcessBus contract (pre-existing)
tests/test_zmq_bus.py14ZmqBus: bind/connect, pub/sub, errors, lifecycle
tests/test_mp_bus.py12MpBus: pub/sub + req/serve; cross-process round-trip via real mp.Process
tests/test_udp_bus.py9UdpBus: datagram receive, get_latest, error surfaces
tests/test_robot_server.py8RobotServer + RemoteRobotClient over ZMQ
tests/test_robot_server_inproc.py4Same pair over InProcessBus
tests/test_remote_robot_client_constructors.py2over_in_process + over_mp_subprocess
tests/test_digital_twin_pid.py10PID kernel: shape, lifecycle, convergence, safety
tests/test_digital_twin_e2e.py2over_zmq_local_server: spawn → connect → converge → shutdown
tests/test_robot_digital_twin_routing.py7Robot.digital_twin slot, flag toggle, action retagging

Net: uv run pytest -m "not skip_ci and not skip_long and not hardware" → 439 passing, 48 skipped (the usual integration/credential gates), 71 deselected. Baseline before this branch was 381.

10. What's still on the to-do list

ROS streaming → PubSub

State/observation updates still ride the legacy raw mp.Queue tuple protocol. The natural follow-up: PubSub topics on the same MpBus that already carries the control req/res. Then ros_robot_client + ros_robot_server stop having any direct queue handling at all.

Frontend launcher integration

servo7/launcher/ still owns the ssh + devcontainer remote-spawn path for the frontend's "pick a host" flow. RemoteRobotClient.over_remote_subprocess(host, robot_type) would give scripted/headless callers the same convenience that over_zmq_local_server has today — and let the LauncherService delegate the actual spawning instead of duplicating it.

InProcessBus is still bytes-typed

The PubSub ABC types payload as bytes because the cross-process buses need it. In-process callers serialize for the contract even though the bus itself just passes references. A Channel[T] generic — or just relaxing the type on InProcessBus — would let hot paths skip the JSON round-trip entirely.

Class renames left for a follow-up

Files renamed (mock_6dof.py, pid_6dof.py, …) but class names didn't (still DummyRobot, DigitalTwinPID, R1LiteSimRobot). One sweeping rename pass when the team's ready; everything still works either way.