Atlas · Servo7 · branch JvL/digital_twin_integration

Digital twin integration — interactive flow

Click the mode buttons, toggle the DT flag, hit record, then "Tick" (or turn on Auto). Packets travel along whichever path is currently active. The record queue fills while recording in non-replay modes; REPLAY drains it back through the bus (FIFO).

Update 2026-05-16 (later): resurveyed after 9 commits. Soft snap-back was reverted to an immediate snap_twin_to_self() on DT disable. Robot.commanded_action is now a single-slot store (last action) populated inside set_state. STATE_UPDATE now carries four joint vectors per follower (joint_positions, hw_joint_positions, dt_joint_positions, commanded_joint_positions). The frontend renders a new AtlasJointDebugStrip that diffs all three.

Mode
DT
Twin
Simulate
mode: teleop DT: off rec: off
TELEOP LEADER Quest / Dummy · event-driven AI LEADER AINode.predict_sync() · 30Hz ROBOT_CONTROLLER controller.get_action() · 30Hz REPLAY (synthetic) twin.get_action() drains queue Orchestrator orchestrator_node.py CONTROL MODE mode: teleop command_dispatch_worker (30Hz) AI · CONTROLLER · REPLAY pulled here TELEOP is event-driven (callback) all paths → smooth + blend _publish_command() single seam → bus topic action.{id} EMA smoother · ModeBlender PubSub bus · "action.{follower_id}" in-process or ZMQ (RemoteRobotClient) RobotNode (follower) robot.set_state(action) — captures Robot.commanded_action HARDWARE _set_state_impl() drives real arm (or mirrored to twin) FROZEN (DT on) DT GATE if use_digital_twin → twin.set_state() else: HW + mirror flag: OFF DigitalTwin (DigitalTwinPID) digital_twin/base.py · pid_6dof.py twin.set_state(action) _record(action) if recording & !replay_active _set_state_impl() · 100Hz P/D step REC _record_queue : deque[RobotState] depth: 0 replay drains via twin.get_action() SystemState · STATE_UPDATE @ 5Hz annotate_state() now emits 4 vectors: · joint_positions (route-aware: twin if DT on) · hw_joint_positions (real hardware, always) · dt_joint_positions (twin pose, always) · commanded_joint_positions (last action) callbacks fire on update: → orchestrator._on_leader_state_change (TELEOP) Viser viewer servo7/viz/viewer.py · :8080 subscribes SystemState["follower"] URDF redraws on every state update buttons + jog sliders "Scrub recording" → twin.get_trajectory() jog → controller.set_joint(i, v) Frontend (React) · robot-frontend/ · WS :8765 AtlasOrchestrator panel SET_CONTROL_MODE · EMERGENCY_STOP · SET_DIGITAL_TWIN AtlasJointDebugStrip (new) 3 rows / joint: state · action · dt (amber/cyan on drift) iframe → viser viewer :8080 URDF · jog · recording · scrub

Record queue

0 / 32 recorded idle

What happens on tick?

Pick a mode + Tick. Or hit Auto.

Event log

How to read this

Where state actually lives (the part that doesn't lie)

StoreOwnerWho writes · who reads
SystemState.statesSystemStatewritten by every robot's state publish · read by orchestrator (TELEOP callback) + viewer (URDF) + STATE_UPDATE broadcast
RobotController._queueRobotControllerpushed by viser jog · drained by orchestrator at 30Hz in CONTROLLER mode
DigitalTwin._record_queueDigitalTwinFIFO. written by _record() when recording + not replay-active · drained by REPLAY via popleft() · snapshotted by viewer scrub
Robot.commanded_actionEach RobotNEW. single-slot store (last action). populated at the top of set_state() before routing. surfaced as commanded_joint_positions in STATE_UPDATE for the joint-debug strip
orchestrator.control_modeOrchestratorset by frontend WS (SET_CONTROL_MODE) · broadcast as MODE_CHANGED
Robot._use_digital_twinEach Robottoggled by enable_digital_twin() · independent of control_mode · disable does an immediate snap_twin_to_self() (no settle window)

The one insight that makes the rest make sense

A DigitalTwin is a Robot, and it lives inside its parent Robot.

It inherits from the same Robot ABC as PiperRobot or SO100Robot. Same set_state(action), same get_state(), same RobotState dataclass on the wire. That's why the orchestrator, the system state, and the viewer don't have to care whether they're talking to silicon or to a P/D integrator — they just talk to "a Robot".

The twin isn't a sibling of the real robot — it's a field on the real robot. piper.digital_twin holds an instance of another Robot subclass. The "DT toggle" doesn't switch between two different APIs — it switches which Robot instance the parent's set_state call routes to: the parent's own hardware path, or the twin sitting in self.digital_twin.

Class hierarchy

Abstract base
Hardware (real motors)
Digital twin (sim / record)
Remote (network proxy)
«abstract base» Robot servo7/robot/hardware/base.py @abstractmethod connect() → bool @abstractmethod disconnect() → bool + set_state(action) → bool (subclasses override _set_state_impl) + get_state() → RobotState (subclasses override _get_state_impl) + set_digital_twin(twin: "Robot | None") + enable_digital_twin(on: bool) HARDWARE servo7/robot/hardware/ + mock/ DIGITAL TWIN servo7/digital_twin/ REMOTE PROXY servo7/robot/remote/ concrete hardware Robots PiperRobot hardware/piper.py DualPiperRobot hardware/dual_piper.py SO100Robot · DualSO100Robot hardware/so100*.py SO100FollowerRobot hardware/so100_follower.py QuestVRRobot hardware/quest_vr.py · used as leader RosRobot (base) hardware/base_ros.py └ R1RosRobot — hardware/r1.py └ R1TRosRobot — hardware/r1t.py DummyRobot mock/mock_6dof.py SignalRobot <- DummyRobot mock/mock_6dof_signal.py «abstract» DigitalTwin digital_twin/base.py + start_recording() / stop_recording() + get_action() → RobotState (popleft) + begin_replay() / end_replay() + get_trajectory() — non-destructive _record_queue : deque[RobotState] _recording · _replay_active (gate) DigitalTwinPID digital_twin/pid_6dof.py P/D integrator · 100Hz step thread R1LiteSimRobot digital_twin/r1_mujoco.py MuJoCo backend (uses digital_twin/mujoco/r1/*) network proxies RemoteRobotClient remote/robot_client.py · ZMQ / MP / InProcess talks to a remote RobotServer ROSRobotClient remote/ros_robot_client.py talks to a remote ROSRobotServer Companion servers in remote/: RobotServer · ROSRobotServer COMPOSITION — the twin lives INSIDE PiperRobot (real) self.digital_twin = … DigitalTwinPID (also a Robot) lives as a field inside the parent ↓ inner type = any DigitalTwin subclass below FACTORY RobotFactory.create(config) servo7/robot/factory.py reads RobotType enum → instantiates the right class PIPER · DUAL_PIPER · SO100 · DUAL_SO100 · R1 · R1T R1_LITE_SIM · QUEST_VR · DUMMY · SIGNAL · PID attaches a DigitalTwin via set_digital_twin() when configured RUNTIME OWNER RobotNode servo7/nodes/robot_node.py owns one Robot per follower subscribes "action.{robot_id}" on the bus publishes state changes to SystemState orchestrated by OrchestratorNode (mode state machine)

Hollow triangle = inheritance (UML extends). The dashed inset at the top-left shows the composition relationship: any concrete Robot can hold another Robot as a field via self.digital_twin — the twin doesn't sit next to the parent, it sits inside it.

Folder layout

Only the parts relevant to the robot/digital-twin story. Annotations: key files defining the contract, new stuff added on this branch, // notes.

servo7/ ├── robot/ │ ├── factory.py // RobotFactory.create(config) — picks the right class from RobotType │ ├── robot_types.py // RobotType enum · RobotState · RobotObservation · RobotStatus │ │ │ ├── hardware/ // real motors / cameras / IK │ │ ├── base.py // Robot ABC · DT routing · snap_twin_to_self · commanded_action │ │ ├── base_ros.py // shared RosRobot base │ │ ├── piper.py // PiperRobot │ │ ├── dual_piper.py // DualPiperRobot │ │ ├── so100.py // SO100Robot │ │ ├── dual_so100.py // DualSO100Robot │ │ ├── so100_follower.py // SO100FollowerRobot │ │ ├── quest_vr.py // QuestVRRobot — used as leader (input) │ │ ├── r1.py // R1RosRobot │ │ └── r1t.py // R1TRosRobot — teleop variant │ │ │ ├── mock/ // deterministic stand-ins for tests + jogging │ │ ├── mock_6dof.py // DummyRobot — the generic 6-DOF mock │ │ ├── mock_6dof_signal.py // SignalRobot — DummyRobot with deterministic signal trace │ │ └── mock_r1.py // MockRobotNode for R1 integration tests │ │ │ └── remote/ // network proxies — used when twin/HW run in different processes │ ├── robot_client.py // RemoteRobotClient (ZMQ) — looks like a Robot, forwards to RobotServer │ ├── robot_server.py // RobotServer — wraps a local Robot, exposes it over ZMQ │ ├── ros_robot_client.py │ └── ros_robot_server.py │ ├── digital_twin/ // NEW package on this branch — Robot subclasses that simulate │ ├── base.py // DigitalTwin ABC — adds _record_queue, replay gate, snap() │ ├── pid_6dof.py // DigitalTwinPID — P/D 6-DOF integrator at 100Hz │ ├── r1_mujoco.py // R1LiteSimRobot — MuJoCo-backed twin │ └── mujoco/ // embedded sim env (URDF, scene, scripted policies) │ └── r1/ … │ └── nodes/ ├── orchestrator_node.py // mode state machine · command dispatch worker · broadcast ├── robot_node.py // owns one Robot per follower · subscribes "action.{id}" ├── frontend_node.py // WS :8765 server · SET_CONTROL_MODE / STATE_UPDATE handlers ├── system_state_node.py // SystemState container + callback registry ├── ai_node.py // AI policy leader (predict_sync) ├── recorder_node.py // dataset episode recorder ├── training_node.py // LoRA / policy training lifecycle └── message_protocol.py // MessageType enum, ControlMode enum, message dataclasses

What a "robot with a twin" looks like at runtime

The twin literally lives inside the parent robot. It's a field, not a sibling. piper.digital_twin holds a DigitalTwinPID instance — which is itself a Robot. The drawing below shows the nesting verbatim: outer box = the real Piper, inner box = the twin it owns.
RobotNode (one per follower — owns ONE Robot, which is…) PiperRobot (real arm) — inherits Robot self.commanded_action : RobotState | None self._use_digital_twin : bool self.digital_twin : Robot | None ◀ the field that holds the twin (below) ↓ self.digital_twin = DigitalTwinPID(…) — this whole box IS the twin, attached as a field DigitalTwinPID (sim) · inherits DigitalTwin < Robot self._record_queue : deque[RobotState] · self._recording · self._replay_active self._target / self._joints (P/D state) · 100 Hz step thread converges _joints → _target piper.set_state(action): (the actual body) self.commanded_action = action if self.digital_twin is not None: self._set_state_digital_twin(action) ← always feeds the inner twin if not self.use_digital_twin: self._set_state_impl(action) ← HW only when DT routing is off

Two independent if statements, not an if/else. The twin always sees the write (so its pose tracks the real arm even when DT routing is off — that's how the joint-debug-strip stays meaningful). The hardware path only runs when the routing flag is off; when it's on, the motors stay still and only the inner twin moves. Same Robot contract on both sides — nothing outside this diagram changes either way.

Quick recipe — how a new digital twin gets added

  1. New subclass under servo7/digital_twin/ extending DigitalTwin (recording / replay-gate machinery is inherited).
  2. Override the two @abstractmethods: connect() and disconnect().
  3. Override _set_state_impl(action) and _get_state_impl() for the sim's write/read. (Don't override set_state / get_state directly unless you really need to — the DT-routing logic in the base class only kicks in if you let it.)
  4. Add an enum value to RobotType in robot/robot_types.py.
  5. Wire it up in RobotFactory.create() — map the new type to your class.
  6. Configure it on the hardware robot at boot: real_robot.set_digital_twin(new_twin); flip it on with real_robot.enable_digital_twin(True).
  7. Nothing in the orchestrator, system state, or viewer changes.

One contract, many transports

The whole point of PubSub: nothing above the bus knows or cares whether bytes travel via callbacks, an mp.Queue, a ZMQ socket, or a UDP datagram. Same four methods (publish / subscribe / get_latest / close) on every implementation. Same client code, swap the bus at construction time.

That's how the same RobotServer + RemoteRobotClient pair runs in-process in tests, in a subprocess for local isolation, and across hosts in production.

«abstract base» PubSub servo7/bus/bus.py publish(topic, payload) subscribe(topic, callback) · get_latest(topic) · close() SAME PROCESS InProcessBus bus/in_process_bus.py publish → direct callback dispatch zero serialization, zero IPC use: tests, sim-only, integration latency: ~µs scope: one Python process SIBLING PROCESS MpBus bus/mp_bus.py backed by mp.Queue pair + request() / serve() — sync RPC on the same wire use: spawn a robot in a subprocess without OS networking latency: ~ms scope: parent + child proc NETWORK · STREAM ZmqBus bus/zmq_bus.py PUB / SUB sockets per topic bind() server-side, connect() client-side reliable, ordered, queued use: production cross-host control backbone for RemoteRobotClient latency: few ms (LAN) scope: any host that can route NETWORK · DATAGRAM UdpBus bus/udp_bus.py raw UDP sockets per topic unreliable, lossy, no order low-overhead streaming use: Quest VR pose stream (hardware/quest_vr.py) latency: ~sub-ms scope: any host on the LAN

The remote pair — wrapping a Robot, not a RobotNode

Important design choice: RemoteRobotClient and RobotServer wrap a single Robot instance — not a RobotNode.

That means the remoting boundary sits one level below where you might expect. Anything that takes a Robot can take a remote-wrapped one without noticing. The orchestrator, the RobotNode, the SystemState callbacks — none of them know whether the Robot under them is local silicon, a digital twin, or a ZMQ stub pointing at another machine.

The flip side: today there's one RobotNode per follower. The modularity is at the Robot level, not yet at the RobotNode level. See the "Future" panel below.

CLIENT PROCESS (e.g. atlas main) OrchestratorNode · RobotNode treats the Robot below as just "a Robot" RemoteRobotClient (Robot subclass) robot/remote/robot_client.py set_state(action) → bus.publish(TOPIC_COMMAND, …) on bus(TOPIC_STATE) → cache as self._state PubSub bus (client end) ZmqBus.connect(host, ports) / MpBus / InProcessBus TOPIC_COMMAND → out TOPIC_STATE ← in TOPIC_STATE (RobotState frames) TOPIC_COMMAND (RobotState actions) same wire whatever the bus is SERVER PROCESS (e.g. piper-host) PubSub bus (server end) ZmqBus.bind(ports) / MpBus / InProcessBus TOPIC_COMMAND ← in TOPIC_STATE → out RobotServer robot/remote/robot_server.py subscribes TOPIC_COMMAND → robot.set_state() polls robot.get_state() → publishes TOPIC_STATE Robot (the actual one) PiperRobot · DigitalTwinPID · DummyRobot · anything

Same RobotServer class, same RemoteRobotClient class — the only thing that changes between dev/test/prod is which PubSub implementation gets passed in.

How to instantiate — same client, four transports

RemoteRobotClient.over_zmq(
    robot_type="piper",
    host="piper-host.lan",
    state_port=5555, command_port=5556,
)
# production · server already running on another host · ZmqBus.connect

RemoteRobotClient.over_zmq_local_server(
    robot_config="piper.yaml",
    state_port=5555, command_port=5556,
)
# spawn a robot_server subprocess on localhost, talk to it via ZMQ
# useful when the robot lib needs a different env than atlas

RemoteRobotClient.over_in_process(robot_config="dummy.yaml")
# server runs in the SAME process, talks over InProcessBus
# useful for tests — exercises the proxy code path with zero IPC overhead

RemoteRobotClient.over_mp_subprocess(robot_config="dummy.yaml")
# mp.Process running RobotServer, talk over MpBus (mp.Queue pair)
# useful when you want process isolation without OS sockets

All four return a RemoteRobotClient — which IS a Robot. Pass it to RobotNode or call real_robot.set_digital_twin(client) just like any other Robot.

Where this sits in the abstraction stack

Orchestrator mode state machine RobotNode owns ONE Robot (today) Robot (abstract contract) .set_state / .get_state …and "that Robot" might actually be: PiperRobot talks to real arm directly DigitalTwinPID P/D simulation RemoteRobotClient → bus → RobotServer (another proc/host) PiperRobot (over there) or DigitalTwinPID, or anything else

Future direction

Today: one RobotNode per follower; modularity lives at the Robot level. You can swap silicon for sim, swap local for remote, attach a twin — all without touching anything above RobotNode. But you can't yet teach a single RobotNode to look after multiple robots at once.

Where this is going: the long-term plan is RobotNode as a collection. The communication boundary already exists at the right place (per-Robot, not per-Node), so the future shape is additive — wrap each member of the collection in whatever transport it needs, the rest of the system keeps treating them as Robots.

TODAY Orchestrator RobotNode (one robot) Robot (local / twin / remote-wrapped) RobotNode (one robot) Robot (local / twin / remote-wrapped) FUTURE (planned) Orchestrator RobotNode (collection) Robot A local (PiperRobot) Robot B remote (ZMQ) RemoteRobotClient Robot C twin (in-process) DigitalTwinPID

The dashed boxes on the right haven't happened yet — that's the planned direction. The point of the diagram is to show that the building blocks (transport-agnostic Robot wrappers) are already in place to support it without changing the surface.