RFC: Pluggable Peripherals

Status Draft
Author Willem Momma
Scope servo7/robot, new servo7/sensors and servo7/actuators, removal of servo7/observation

TL;DR

Goal
Swapping a peripheral on a robot is a launch-time selection, not a code edit.
  1. Physically swap the actuator (or sensor) on the robot.
  2. Relaunch main with the new driver name (CLI flag, or pick it in the launcher panel).
A single robot can carry multiple actuators and multiple sensors at the same time. Different actuators speak different control languages (a parallel gripper takes position, a vacuum takes on/off); the abstraction handles that uniformly.

Introduce two new abstractions, Actuator and Sensor, both held directly as lists on the Robot instance. Drop the ObservationProvider layer; its concrete subclasses collapse into per-device Sensor classes.

The two abstractions are not merged into a common parent. They have different lifecycles (read-only vs read/write) and different command surfaces. A shared base would add inheritance complexity without removing duplication.

First targeted drivers: an OpenCV camera, a Fairino-internal force-torque sensor, and a Festo vacuum gripper. Enough to validate the shape on real hardware against the Fairino FR3 before migrating the rest of the fleet.

Context

The system today

A Robot ABC (servo7/robot/hardware/base.py:21-803) defines the hardware interface: connect, disconnect, _get_state_impl, _set_state_impl. Concrete subclasses live under servo7/robot/hardware/: PiperRobot, FairinoRobot, R1RosRobot, SO100FollowerRobot, plus dual variants and mocks.

Cameras are partially abstracted, but only at a provider level. Every robot owns an optional ObservationProvider (servo7/observation/provider.py:60-141) built from the yaml observation block. Two concrete providers exist: LeRobotObservationProvider (OpenCV via LeRobot, with tenacity-driven reconnect) and ROSObservationProvider (ROS image topics). All cameras owned by a single robot must come from a single source type; there is no per-device camera abstraction and no path to add force-torque, lidar, or IMU into the same observation.

Actuators have no abstraction at all. Every robot bakes its tool path into its _set_state_impl and _get_state_impl:

RobotHardcoded pathLocation
Piperinterface.GripperCtrl(pos, 1000, 0x01, 0)piper.py:191
R1Two JointState publishers; joint indices 6 and 13r1.py:340-345, 686-698
SO100 Follower"gripper.pos" in LeRobot action dict; motor id 6so100_follower.py:255-278
FairinoNo end-effector implemented at allfairino.py

Downstream consumers carry the same shape. recorder/lerobot.py has explicit TODOs at lines 967 and 997: "grippers and base should come from robot config." The robot.config.gripper yaml block exists (see piper.yaml:23-25) but its type field is metadata only; nothing reads it to pick a driver.

Current architecture Robot «abstract» + observation_provider gripper logic baked into each subclass PiperRobot interface.GripperCtrl( pos, 1000, 0x01, 0) native SDK call FairinoRobot no end-effector implemented arm only R1RosRobot 2x JointState pub _create_gripper_message fixed indices 6, 13 SO100FollowerRobot "gripper.pos" key in motor action dict motor id 6 Robot composes one observation provider: ObservationProvider «abstract» LeRobotObservationProvider (OpenCV only) ROSObservationProvider (ROS topics only) all cameras must share one source type; no mixing
Figure 1. Current composition. Each robot subclass owns its gripper code; the provider is source-locked.

The problem

Cobots are physically the same chassis with interchangeable tools. The same Fairino FR3 can carry a Robotiq 2F-85, a Festo vacuum cup, or a magnetic gripper. A workcell might run a parallel gripper on the left arm and a suction cup on the right. A research arm might pick up a force-torque sensor for a contact-rich task that didn't need one yesterday.

This shape is hard to support today, for four reasons:

  1. Swapping a tool requires editing the robot subclass. Putting a Robotiq on a Piper means writing gripper code in piper.py next to GripperCtrl. Putting one on a Fairino means writing it from scratch in fairino.py. Same tool, two subclass edits.
  2. Different tools speak different control languages. A parallel gripper takes a position scalar. A vacuum gripper takes a boolean. There is no shared interface to dispatch any of them, so each subclass invents its own.
  3. Multiple actuators per robot is not expressible. The current shape assumes one gripper per arm.
  4. Sensor diversity is bottlenecked at the provider. A robot that wants RGB + force-torque on the same arm cannot mix them: all cameras must come from one source type, and force-torque has no producer at all even though RobotObservation.force_torque is in the dataclass at robot_types.py:417.

Proposed Solution

Architecture overview

Robot stops owning peripheral implementations directly. At connect time it builds two parallel lists, one of Sensor instances and one of Actuator instances, both from yaml, both via factories. The existing ObservationProvider layer disappears.

Proposed architecture Robot «abstract» + sensors: list[Sensor] (new) + actuators: list[Actuator] (new) + _read_arm_state() (new abstract) + _write_arm_state() (new abstract) peripherals built by factories at connect() PiperRobot arm only (gripper code removed) FairinoRobot arm only (now supports any tool) R1RosRobot arm only (gripper pubs removed) SO100FollowerRobot arm only (motor 6 split out) Robot.connect() composes both lists directly from yaml: Actuators track Robot.actuators: list[Actuator] Actuator «abstract» PiperGripper FestoVacuum Robotiq2F85 (future) RosPositionGripper (future) command(target) + read() -> ActuatorState v1 kinds: parallel_gripper, vacuum Sensors track Robot.sensors: list[Sensor] Sensor «abstract» OpenCVCamera FairinoForceTorque RealSenseCamera (future) RosImageCamera (future) read() -> SensorReading (tagged by kind) v1 kinds: rgb_camera, force_torque
Figure 2. Proposed composition. Symmetric: Robot owns both lists directly, no provider layer.

Why two abstractions and not one

Sensor and Actuator both have connect and disconnect. The shared surface stops there.

SensorActuator
DirectionRead-onlyRead and write
Failure modeDrop a sample, fault-isolatedLatches into robot status, safety-relevant
Routing targetRobotObservation slots by kindActuatorState back into RobotState

A common parent ABC would push abstract methods into one or both that the other does not need. Two narrow hierarchies match the actual shape.

Terminology: in this RFC, "Actuator" means a tool-side peripheral (grippers, vacuum, etc.). The robot's joint motors are part of the arm and stay inside the robot subclass.

Actuator abstraction

Layout

Drivers live flat under servo7/actuators/ next to the ABC. A per-kind subpackage (gripper/, vacuum/) can be extracted later if a kind grows enough shared logic to justify a kind-level base. With two or three drivers the flat layout is fewer files and fewer concepts.

servo7/actuators/
  base.py              # Actuator ABC + ActuatorCommand + ActuatorState + ActuatorKind
  festo_vacuum.py      # Festo vacuum via robot digital IO
  piper_gripper.py     # wraps interface.GripperCtrl
  factory.py           # build_actuator(config, ctx) -> Actuator

ABC

class ActuatorKind(str, Enum):
    PARALLEL_GRIPPER = "parallel_gripper"
    VACUUM           = "vacuum"


@dataclass
class ActuatorCommand:
    position: float | None = None        # parallel grippers
    engaged: bool | None = None          # vacuum
    force_limit: float | None = None     # optional safety


@dataclass
class ActuatorState:
    timestamp: float
    position: float | None = None
    engaged: bool | None = None
    is_holding: bool | None = None       # object-present, when available


class Actuator(ABC):
    def __init__(self, config: "ActuatorConfig") -> None:
        self.name = config.name
        self.config = config
        self.connected = False

    @property
    @abstractmethod
    def kind(self) -> ActuatorKind: ...

    @abstractmethod
    def connect(self, ctx: "RobotContext") -> bool: ...

    @abstractmethod
    def disconnect(self) -> bool: ...

    @abstractmethod
    def read(self) -> ActuatorState: ...

    @abstractmethod
    def command(self, target: ActuatorCommand) -> bool: ...

    @abstractmethod
    def is_safe(self, target: ActuatorCommand) -> bool: ...

Future kinds (adaptive grippers, magnets, tool changers) extend ActuatorKind and add fields to ActuatorCommand / ActuatorState when their first driver lands. Out of scope for v1.

Sensor abstraction

Layout

Same flat shape:

servo7/sensors/
  base.py              # Sensor ABC + SensorReading hierarchy + SensorKind
  camera_opencv.py
  ft_fairino.py        # via Fairino SDK GetForceTorqueRCS
  factory.py           # build_sensor(config, ctx) -> Sensor

The existing servo7/observation/ directory is deleted. The shared shared_memory.py file moves to servo7/utils/ since it's a transport concern, not an observation concern.

ABC

class SensorKind(str, Enum):
    RGB_CAMERA   = "rgb_camera"
    FORCE_TORQUE = "force_torque"


@dataclass
class SensorConfig:
    name: str
    type: str
    params: dict[str, Any]


class Sensor(ABC):
    def __init__(self, config: SensorConfig) -> None:
        self.name = config.name
        self.config = config
        self.connected = False

    @property
    @abstractmethod
    def kind(self) -> SensorKind: ...

    @abstractmethod
    def connect(self, ctx: "RobotContext") -> bool: ...

    @abstractmethod
    def disconnect(self) -> bool: ...

    @abstractmethod
    def read(self) -> "SensorReading": ...


@dataclass
class SensorReading:
    timestamp: float
    kind: SensorKind

@dataclass
class CameraReading(SensorReading):
    image: np.ndarray

@dataclass
class ForceTorqueReading(SensorReading):
    wrench: list[float]        # [Fx, Fy, Fz, Tx, Ty, Tz]
    frame: str = "tcp"

Adding depth, lidar, or IMU is additive: extend SensorKind, add a reading subclass, write a driver. The ABC does not change.

Where the provider's responsibilities go

Robot.get_observation() becomes a small loop that iterates self.sensors, catches per-sensor failures, populates the matching slot on RobotObservation by kind, and stashes failed names in obs.failed_sensors:

def get_observation(self) -> RobotObservation:
    obs = RobotObservation(timestamp=time.time(), robot_id=self.name)
    failed: set[str] = set()
    for sensor in self.sensors:
        try:
            self._route_reading_into_obs(sensor.read(), obs)
        except Exception as e:
            failed.add(sensor.name)
            self.logger.warning(f"sensor {sensor.name} read failed: {e}")
    obs.failed_sensors = failed
    return obs

Per-sensor reconnect (with tenacity, when the driver opts in) lives inside each Sensor.connect(). The base class provides a small helper that subclasses opt into; it's not on the ABC because some sensors (network-mapped F-T, ROS topic) connect instantly and never benefit from backoff.

Shared RobotContext

Both sensors and actuators may need access to the host robot's SDK interface or a shared bus lock. A single typed dataclass serves both:

@dataclass
class RobotContext:
    interface: Any = None                # PiperSDK, FairinoSDK.RPC, lerobot.Robot, ...
    bus_lock: Any = None                 # threading.Lock for shared SDK buses
    logger: logging.Logger | None = None

USB cameras don't touch ctx. A Festo vacuum on a Fairino reads ctx.interface to call SetDO. The dataclass widens as new transport needs appear (a ROS node field for R1-style robots, EtherCAT, etc.); the peripheral ABCs do not.

Robot integration

The base Robot.__init__ adds two lists, populated at connect time:

self.sensors: list[Sensor] = []
self.actuators: list[Actuator] = []

def connect(self) -> bool:
    # ...subclass-specific arm connect happens first (e.g. open SDK port)...
    ctx = self._build_robot_context()

    for cfg in self.config.get("sensors", []):
        sensor = build_sensor(cfg)
        if sensor.connect(ctx):
            self.sensors.append(sensor)

    for cfg in self.config.get("actuators", []):
        actuator = build_actuator(cfg)
        if actuator.connect(ctx):
            self.actuators.append(actuator)

    return True

Read and write paths on Robot become:

def _get_state_impl(self) -> RobotState:
    state = self._read_arm_state()                  # subclass implements
    self._merge_actuator_state(state)               # base class fills gripper_* fields
    return state

def _set_state_impl(self, action: RobotState) -> bool:
    ok_arm = self._write_arm_state(action)          # subclass implements
    ok_act = self._command_actuators(action)        # base class dispatches
    return ok_arm and ok_act

Subclasses now implement _read_arm_state and _write_arm_state: the arm-only halves of today's _get_state_impl and _set_state_impl. The base class owns the actuator merge, the actuator command dispatch, and the sensor read loop.

Config layout and composition

Peripherals live in their own yaml files, organised as a library, and are composed onto a robot at launch time by name. This avoids copy-pasting full sensor or actuator blocks across every robot config when the underlying device is the same.

configs/
  robots/
    fairino_fr3.yaml
    piper.yaml
  actuators/
    festo_vacuum.yaml
    piper_gripper.yaml
  sensors/
    opencv_wrist.yaml
    fairino_ft.yaml

Composition happens at the command line (the launcher panel calls the same composition function under the hood):

scripts/main.py --robot fairino_fr3 --sensors opencv_wrist fairino_ft --actuators festo_vacuum

Swapping a Piper's native gripper for a Festo vacuum is a one-flag change after the physical swap:

# was:
scripts/main.py --robot piper --actuators piper_gripper
# is:
scripts/main.py --robot piper --actuators festo_vacuum

Each peripheral yaml is robot-agnostic. The device knows its own protocol. Any wiring detail that IS robot-specific (an output pin number) lives in the peripheral yaml as a default. Many cases need no wiring detail at all: the Fairino's vendor SDK auto-detects connected grippers and IO; an OpenCV camera connects to /dev/videoN and never talks to the robot at all.

The legacy top-level observation: block in robot yamls is read as a backward-compat shim and translated to the library shape at load time, so existing configs keep working until they get migrated.

Mapping ActuatorState onto RobotState

RobotState keeps gripper_position (single) and gripper_positions (list[2]) for downstream compatibility with the recorder and AI worker. The base class populates them from Actuator.read():

Multi-actuator dispatch beyond index-based (e.g., a per-name actuator_commands dict on RobotState) is a follow-up: the list shape allows it; v1 does not require it.

Frontend control integration

A direct payoff: operators get a per-actuator control widget in the Atlas frontend that is decoupled from arm teleoperation. Today, toggling a Piper's gripper from the UI routes through the same teleop path the arm uses. With actuators as named, kind-tagged peripherals, the orchestrator can dispatch a command to a single actuator on a single robot without touching arm state.

The mechanism reuses the existing orchestrator websocket (servo7/nodes/websocket_server.py, servo7/nodes/message_protocol.py):

No new transport, no separate lifecycle. Detailed payload schemas and rendering land in a follow-up frontend PR once the backend is in place.

Safety and bounds

Actuator.is_safe owns its own range and command-shape validation. The current gripper-name-masking trick in Robot._check_bounds_vectorised (base.py:114-117) goes away: the arm's joint-limit logic only sees arm joints. The base class calls is_safe on each actuator before dispatching.

Concrete driver shape

One sketch to make the ABC concrete:

class FestoVacuumViaRobotIO(Actuator):
    @property
    def kind(self) -> ActuatorKind:
        return ActuatorKind.VACUUM

    def connect(self, ctx: RobotContext) -> bool:
        self._interface = ctx.interface
        self._bus_lock  = ctx.bus_lock
        self._activate  = int(self.config.params["activate_output"])
        self._engaged_cache: bool = False
        self.connected = True
        return True

    def command(self, target: ActuatorCommand) -> bool:
        if target.engaged is None:
            return True
        return self._set_output(bool(target.engaged))

    def read(self) -> ActuatorState:
        return ActuatorState(
            timestamp=time.time(),
            engaged=self._engaged_cache,
            is_holding=self._engaged_cache,
        )

    def is_safe(self, target: ActuatorCommand) -> bool:
        return target.position is None

The transport-specific call (_set_output) wraps whatever the host robot's SDK exposes for tool-side digital IO (for Fairino, SetDO on ctx.interface). A standalone FestoVacuumViaModbus can be added later for Modbus generators without changing the ABC or the yaml shape.

Alternatives Considered

Single Peripheral parent ABC

Both sides have connect / disconnect. The shared surface stops there. Sensor is read-only and fault-isolated. Actuator is read-write with safety state and atomic stop semantics that read-only sensors do not have. A common base would push abstract methods into one or both that the other does not need.

One driver per (robot, tool) pair

Worst option. The matrix explodes: Piper+Festo, Fairino+Festo, R1+Festo, Piper+Robotiq, Fairino+Robotiq, and so on. The chosen design has one driver per tool, parameterised by the RobotContext it gets at connect time. Count stays linear in tool count rather than quadratic.

Non-Goals

Trade-offs

Pros

Cons