main with the new driver name (CLI flag, or pick it in the launcher panel).
Introduce two new abstractions, Actuator and Sensor,
both held directly as lists on the Robot instance. Drop the
ObservationProvider layer; its concrete subclasses collapse into
per-device Sensor classes.
The two abstractions are not merged into a common parent. They have different lifecycles (read-only vs read/write) and different command surfaces. A shared base would add inheritance complexity without removing duplication.
First targeted drivers: an OpenCV camera, a Fairino-internal force-torque sensor, and a Festo vacuum gripper. Enough to validate the shape on real hardware against the Fairino FR3 before migrating the rest of the fleet.
A Robot ABC (servo7/robot/hardware/base.py:21-803) defines the
hardware interface: connect, disconnect, _get_state_impl,
_set_state_impl. Concrete subclasses live under
servo7/robot/hardware/: PiperRobot, FairinoRobot,
R1RosRobot, SO100FollowerRobot, plus dual variants and mocks.
Cameras are partially abstracted, but only at a provider level. Every
robot owns an optional ObservationProvider
(servo7/observation/provider.py:60-141) built from the yaml
observation block. Two concrete providers exist:
LeRobotObservationProvider (OpenCV via LeRobot, with tenacity-driven
reconnect) and ROSObservationProvider (ROS image topics). All cameras
owned by a single robot must come from a single source type; there is no
per-device camera abstraction and no path to add force-torque, lidar, or IMU
into the same observation.
Actuators have no abstraction at all. Every robot bakes its tool path into
its _set_state_impl and _get_state_impl:
| Robot | Hardcoded path | Location |
|---|---|---|
| Piper | interface.GripperCtrl(pos, 1000, 0x01, 0) | piper.py:191 |
| R1 | Two JointState publishers; joint indices 6 and 13 | r1.py:340-345, 686-698 |
| SO100 Follower | "gripper.pos" in LeRobot action dict; motor id 6 | so100_follower.py:255-278 |
| Fairino | No end-effector implemented at all | fairino.py |
Downstream consumers carry the same shape. recorder/lerobot.py has explicit
TODOs at lines 967 and 997: "grippers and base should come from robot config."
The robot.config.gripper yaml block exists (see piper.yaml:23-25)
but its type field is metadata only; nothing reads it to pick a driver.
Cobots are physically the same chassis with interchangeable tools. The same Fairino FR3 can carry a Robotiq 2F-85, a Festo vacuum cup, or a magnetic gripper. A workcell might run a parallel gripper on the left arm and a suction cup on the right. A research arm might pick up a force-torque sensor for a contact-rich task that didn't need one yesterday.
This shape is hard to support today, for four reasons:
piper.py next to
GripperCtrl. Putting one on a Fairino means writing it from scratch
in fairino.py. Same tool, two subclass edits.
RobotObservation.force_torque is in the dataclass at
robot_types.py:417.
Robot stops owning peripheral implementations directly. At
connect time it builds two parallel lists, one of Sensor
instances and one of Actuator instances, both from yaml, both via
factories. The existing ObservationProvider layer disappears.
Sensor and Actuator both have connect and
disconnect. The shared surface stops there.
| Sensor | Actuator | |
|---|---|---|
| Direction | Read-only | Read and write |
| Failure mode | Drop a sample, fault-isolated | Latches into robot status, safety-relevant |
| Routing target | RobotObservation slots by kind | ActuatorState back into RobotState |
A common parent ABC would push abstract methods into one or both that the other does not need. Two narrow hierarchies match the actual shape.
Terminology: in this RFC, "Actuator" means a tool-side peripheral (grippers, vacuum, etc.). The robot's joint motors are part of the arm and stay inside the robot subclass.
Drivers live flat under servo7/actuators/ next to the ABC. A
per-kind subpackage (gripper/, vacuum/) can be extracted
later if a kind grows enough shared logic to justify a kind-level base. With
two or three drivers the flat layout is fewer files and fewer concepts.
servo7/actuators/
base.py # Actuator ABC + ActuatorCommand + ActuatorState + ActuatorKind
festo_vacuum.py # Festo vacuum via robot digital IO
piper_gripper.py # wraps interface.GripperCtrl
factory.py # build_actuator(config, ctx) -> Actuator
class ActuatorKind(str, Enum):
PARALLEL_GRIPPER = "parallel_gripper"
VACUUM = "vacuum"
@dataclass
class ActuatorCommand:
position: float | None = None # parallel grippers
engaged: bool | None = None # vacuum
force_limit: float | None = None # optional safety
@dataclass
class ActuatorState:
timestamp: float
position: float | None = None
engaged: bool | None = None
is_holding: bool | None = None # object-present, when available
class Actuator(ABC):
def __init__(self, config: "ActuatorConfig") -> None:
self.name = config.name
self.config = config
self.connected = False
@property
@abstractmethod
def kind(self) -> ActuatorKind: ...
@abstractmethod
def connect(self, ctx: "RobotContext") -> bool: ...
@abstractmethod
def disconnect(self) -> bool: ...
@abstractmethod
def read(self) -> ActuatorState: ...
@abstractmethod
def command(self, target: ActuatorCommand) -> bool: ...
@abstractmethod
def is_safe(self, target: ActuatorCommand) -> bool: ...
Future kinds (adaptive grippers, magnets, tool changers) extend
ActuatorKind and add fields to ActuatorCommand /
ActuatorState when their first driver lands. Out of scope for v1.
Same flat shape:
servo7/sensors/
base.py # Sensor ABC + SensorReading hierarchy + SensorKind
camera_opencv.py
ft_fairino.py # via Fairino SDK GetForceTorqueRCS
factory.py # build_sensor(config, ctx) -> Sensor
The existing servo7/observation/ directory is deleted. The shared
shared_memory.py file moves to servo7/utils/ since it's a
transport concern, not an observation concern.
class SensorKind(str, Enum):
RGB_CAMERA = "rgb_camera"
FORCE_TORQUE = "force_torque"
@dataclass
class SensorConfig:
name: str
type: str
params: dict[str, Any]
class Sensor(ABC):
def __init__(self, config: SensorConfig) -> None:
self.name = config.name
self.config = config
self.connected = False
@property
@abstractmethod
def kind(self) -> SensorKind: ...
@abstractmethod
def connect(self, ctx: "RobotContext") -> bool: ...
@abstractmethod
def disconnect(self) -> bool: ...
@abstractmethod
def read(self) -> "SensorReading": ...
@dataclass
class SensorReading:
timestamp: float
kind: SensorKind
@dataclass
class CameraReading(SensorReading):
image: np.ndarray
@dataclass
class ForceTorqueReading(SensorReading):
wrench: list[float] # [Fx, Fy, Fz, Tx, Ty, Tz]
frame: str = "tcp"
Adding depth, lidar, or IMU is additive: extend SensorKind, add a
reading subclass, write a driver. The ABC does not change.
Robot.get_observation() becomes a small loop that iterates
self.sensors, catches per-sensor failures, populates the matching
slot on RobotObservation by kind, and stashes failed names in
obs.failed_sensors:
def get_observation(self) -> RobotObservation:
obs = RobotObservation(timestamp=time.time(), robot_id=self.name)
failed: set[str] = set()
for sensor in self.sensors:
try:
self._route_reading_into_obs(sensor.read(), obs)
except Exception as e:
failed.add(sensor.name)
self.logger.warning(f"sensor {sensor.name} read failed: {e}")
obs.failed_sensors = failed
return obs
Per-sensor reconnect (with tenacity, when the driver opts in) lives inside
each Sensor.connect(). The base class provides a small helper that
subclasses opt into; it's not on the ABC because some sensors (network-mapped
F-T, ROS topic) connect instantly and never benefit from backoff.
Both sensors and actuators may need access to the host robot's SDK interface or a shared bus lock. A single typed dataclass serves both:
@dataclass
class RobotContext:
interface: Any = None # PiperSDK, FairinoSDK.RPC, lerobot.Robot, ...
bus_lock: Any = None # threading.Lock for shared SDK buses
logger: logging.Logger | None = None
USB cameras don't touch ctx. A Festo vacuum on a Fairino reads
ctx.interface to call SetDO. The dataclass widens as new
transport needs appear (a ROS node field for R1-style robots, EtherCAT, etc.);
the peripheral ABCs do not.
The base Robot.__init__ adds two lists, populated at connect time:
self.sensors: list[Sensor] = []
self.actuators: list[Actuator] = []
def connect(self) -> bool:
# ...subclass-specific arm connect happens first (e.g. open SDK port)...
ctx = self._build_robot_context()
for cfg in self.config.get("sensors", []):
sensor = build_sensor(cfg)
if sensor.connect(ctx):
self.sensors.append(sensor)
for cfg in self.config.get("actuators", []):
actuator = build_actuator(cfg)
if actuator.connect(ctx):
self.actuators.append(actuator)
return True
Read and write paths on Robot become:
def _get_state_impl(self) -> RobotState:
state = self._read_arm_state() # subclass implements
self._merge_actuator_state(state) # base class fills gripper_* fields
return state
def _set_state_impl(self, action: RobotState) -> bool:
ok_arm = self._write_arm_state(action) # subclass implements
ok_act = self._command_actuators(action) # base class dispatches
return ok_arm and ok_act
Subclasses now implement _read_arm_state and _write_arm_state:
the arm-only halves of today's _get_state_impl and
_set_state_impl. The base class owns the actuator merge, the actuator
command dispatch, and the sensor read loop.
Peripherals live in their own yaml files, organised as a library, and are composed onto a robot at launch time by name. This avoids copy-pasting full sensor or actuator blocks across every robot config when the underlying device is the same.
configs/
robots/
fairino_fr3.yaml
piper.yaml
actuators/
festo_vacuum.yaml
piper_gripper.yaml
sensors/
opencv_wrist.yaml
fairino_ft.yaml
Composition happens at the command line (the launcher panel calls the same composition function under the hood):
scripts/main.py --robot fairino_fr3 --sensors opencv_wrist fairino_ft --actuators festo_vacuum
Swapping a Piper's native gripper for a Festo vacuum is a one-flag change after the physical swap:
# was:
scripts/main.py --robot piper --actuators piper_gripper
# is:
scripts/main.py --robot piper --actuators festo_vacuum
Each peripheral yaml is robot-agnostic. The device knows its own protocol. Any
wiring detail that IS robot-specific (an output pin number) lives in the
peripheral yaml as a default. Many cases need no wiring detail at all: the
Fairino's vendor SDK auto-detects connected grippers and IO; an OpenCV camera
connects to /dev/videoN and never talks to the robot at all.
The legacy top-level observation: block in robot yamls is read as a
backward-compat shim and translated to the library shape at load time, so
existing configs keep working until they get migrated.
RobotState keeps gripper_position (single) and
gripper_positions (list[2]) for downstream compatibility with
the recorder and AI worker. The base class populates them from
Actuator.read():
gripper_position is filled from
actuators[0].read(). Parallel grippers map position
directly. Vacuum maps engaged to a normalised scalar (1.0
engaged, 0.0 disengaged).
gripper_positions[i] is filled from
actuators[i].read() on the same scheme; gripper_position
stays None.
actuators[i].read() directly via Robot.actuators. This
is the path that closes the recorder TODOs at
recorder/lerobot.py:967, 997.
Multi-actuator dispatch beyond index-based (e.g., a per-name
actuator_commands dict on RobotState) is a follow-up: the
list shape allows it; v1 does not require it.
A direct payoff: operators get a per-actuator control widget in the Atlas frontend that is decoupled from arm teleoperation. Today, toggling a Piper's gripper from the UI routes through the same teleop path the arm uses. With actuators as named, kind-tagged peripherals, the orchestrator can dispatch a command to a single actuator on a single robot without touching arm state.
The mechanism reuses the existing orchestrator websocket
(servo7/nodes/websocket_server.py,
servo7/nodes/message_protocol.py):
ROBOT_METADATA broadcast grows an
actuators field listing each actuator's name, kind, and
optional range/unit hints. The frontend renders one widget per actuator,
dispatched on ActuatorKind (slider for
PARALLEL_GRIPPER, toggle for VACUUM).
SET_ACTUATOR_COMMAND message type carries
{robot_id, actuator_name, command}. The orchestrator handler
builds an ActuatorCommand and routes it to the named actuator
on the target robot.
No new transport, no separate lifecycle. Detailed payload schemas and rendering land in a follow-up frontend PR once the backend is in place.
Actuator.is_safe owns its own range and command-shape validation.
The current gripper-name-masking trick in
Robot._check_bounds_vectorised (base.py:114-117) goes
away: the arm's joint-limit logic only sees arm joints. The base class calls
is_safe on each actuator before dispatching.
One sketch to make the ABC concrete:
class FestoVacuumViaRobotIO(Actuator):
@property
def kind(self) -> ActuatorKind:
return ActuatorKind.VACUUM
def connect(self, ctx: RobotContext) -> bool:
self._interface = ctx.interface
self._bus_lock = ctx.bus_lock
self._activate = int(self.config.params["activate_output"])
self._engaged_cache: bool = False
self.connected = True
return True
def command(self, target: ActuatorCommand) -> bool:
if target.engaged is None:
return True
return self._set_output(bool(target.engaged))
def read(self) -> ActuatorState:
return ActuatorState(
timestamp=time.time(),
engaged=self._engaged_cache,
is_holding=self._engaged_cache,
)
def is_safe(self, target: ActuatorCommand) -> bool:
return target.position is None
The transport-specific call (_set_output) wraps whatever the host
robot's SDK exposes for tool-side digital IO (for Fairino, SetDO
on ctx.interface). A standalone FestoVacuumViaModbus
can be added later for Modbus generators without changing the ABC or the yaml
shape.
Peripheral parent ABC
Both sides have connect / disconnect. The shared surface stops there.
Sensor is read-only and fault-isolated. Actuator is read-write with safety
state and atomic stop semantics that read-only sensors do not have. A common
base would push abstract methods into one or both that the other does not need.
Worst option. The matrix explodes: Piper+Festo, Fairino+Festo, R1+Festo,
Piper+Robotiq, Fairino+Robotiq, and so on. The chosen design has one driver
per tool, parameterised by the RobotContext it gets at connect time.
Count stays linear in tool count rather than quadratic.
DigitalTwin path (servo7/digital_twin/).
Sensor and Actuator under a common parent ABC.
Robot. Same factory pattern, same RobotContext, same
dispatch shape.
ActuatorCommand with optional fields and per-driver
is_safe.
servo7/observation/provider.py +
provider subclasses get deleted, not just rearranged. Closes long-standing
TODOs at recorder/lerobot.py:967, 997 and unlocks F-T sensing.
Robot base class owns more code (sensor read loop, actuator
dispatch, state merge). Bugs in the base class ripple wider.
ActuatorCommand with optional fields needs per-driver
validation. A vacuum that gets position=0.5 in its command
must reject it explicitly via is_safe; the type system will
not catch it.
_get_state_impl -> _read_arm_state
+ base merge) is a non-trivial refactor across every robot in
servo7/robot/hardware/. Each one has to be re-validated
against real hardware (mitigated by the incremental migration in
non-goals).
gripper_position to
actuators[0] works for the common case but hides assumptions.
A dual-arm config that reorders actuators silently breaks recorder
checkpoints.