Source code for rl_training_validation.utils.env_safety

#!/usr/bin/env python3
"""
Environment availability + real-robot safety helpers for the training
and validation scripts in this repository.

Pure registry-based: an env is "available" iff it's registered in the
Gymnasium registry (after ``import rl_environments``). No separate
implementation-status table — the registry IS the source of truth.

Goal-env detection is by id suffix (``...Goal*Sim-v0/v1`` or
``...Goal*Real-v0/v1``); real-env detection is by id suffix
(``...Real-v0/v1``).

Nothing in this module touches Gazebo or any ROS topic.
"""
from __future__ import annotations

import argparse
import os
import warnings
from typing import Iterable, List, Tuple


# ---------------------------------------------------------------------------
# Gym import (deferred so non-ROS code paths can still import this module)
# ---------------------------------------------------------------------------

def _import_gym():
    try:
        import gymnasium as gym
        import rl_environments  # noqa: F401  triggers registration
    except ModuleNotFoundError as e:
        raise SystemExit(
            f"[rl_training_validation] Cannot import gymnasium / rl_environments: {e}. "
            "Make sure rl_environments is installed (catkin or pip) and that your "
            "workspace's setup.bash is sourced."
        )
    return gym


# ---------------------------------------------------------------------------
# Env-id classification
# ---------------------------------------------------------------------------

# Our registry uses bare ids (no UniROS- prefix). Anything with one of these
# robot prefixes is considered "ours" for listing / cross-check purposes.
_ROBOT_PREFIXES = ("RX200", "NED2", "VX300S", "UR5e")


[docs] def is_registered(env_id: str) -> bool: gym = _import_gym() return env_id in gym.envs.registry
# is_implemented kept as an alias so existing callers don't need to change. is_implemented = is_registered
[docs] def is_real(env_id: str) -> bool: return env_id.endswith("Real-v0") or env_id.endswith("Real-v1")
[docs] def is_goal_env(env_id: str) -> bool: # Matches ...GoalSim and ...GoalReal across v0 / v1 (so a future # bumped version doesn't silently downgrade to non-goal handling). return "Goal" in env_id and ( env_id.endswith("Sim-v0") or env_id.endswith("Sim-v1") or env_id.endswith("Real-v0") or env_id.endswith("Real-v1") )
[docs] def list_implemented() -> List[str]: """List the registered task-env ids (one per documented task). Filters out the abstract robot-base registrations (e.g. ``RX200RobotEnv-v0``, ``NED2RobotGoalBaseSimEnv-v0``, ``VX300SRobotGoalBaseRealEnv-v0``). Those are class-only entries used to share an env class across multiple task envs — they are not ``gym.make``-able tasks and should not appear in user-facing "available envs" listings. """ gym = _import_gym() return sorted( s for s in gym.envs.registry if any(s.startswith(p) for p in _ROBOT_PREFIXES) and "Robot" not in s )
[docs] def list_unimplemented() -> List[str]: # No status table → nothing is "unimplemented but registered". Kept for # backward-compat with callers that expect both lists. return []
# --------------------------------------------------------------------------- # Real-robot motion gate # --------------------------------------------------------------------------- ALLOW_REAL_ROBOT_FLAG_ENV = "ALLOW_REAL_ROBOT_MOTION" ALLOW_REAL_ROBOT_FLAG_PARAM = "/allow_real_robot_motion"
[docs] def add_real_motion_cli(parser: argparse.ArgumentParser) -> None: """Add a ``--allow-real-robot-motion`` flag to ``parser``.""" parser.add_argument( "--allow-real-robot-motion", action="store_true", default=False, help=( "Required to construct any *Real-v0 env in this script. " "Setting this flag also exports ALLOW_REAL_ROBOT_MOTION=1 " "in the current process so downstream code can read consent " "from a single source; the env var is a propagation of the " "same gate, not an independent channel." ), )
# --------------------------------------------------------------------------- # Cube-tracker CLI plumbing for push / pnp real scripts # ---------------------------------------------------------------------------
[docs] def add_cube_tracker_cli(parser: argparse.ArgumentParser) -> None: """Add --cube-tracker / --cube-tracker-camera / --cube-tracker-target-frame. Default behaviour is unchanged: vision pipeline is external. Pass ``--cube-tracker auto`` to have the env roslaunch ``rl_envs_cube_tracker/<camera>.launch`` automatically. """ parser.add_argument( "--cube-tracker", choices=("none", "auto"), default="none", help=( "Vision pipeline source. 'none' (default): assume an external " "publisher on --cube-pose-topic. 'auto': env roslaunches " "rl_envs_cube_tracker on env construction." ), ) parser.add_argument( "--cube-tracker-camera", choices=("kinect2", "zed2", "d405"), default="kinect2", help="Which camera launch file rl_envs_cube_tracker uses (only with --cube-tracker auto).", ) parser.add_argument( "--cube-tracker-target-frame", default="", help=( "If non-empty, rl_envs_cube_tracker TF-transforms /cube_pose into " "this frame (e.g. rx200/base_link). Requires extrinsic calibration; " "see rl_envs_cube_tracker/config/extrinsics/README.md." ), )
[docs] def apply_cube_tracker_kwargs(env_kwargs: dict, args: argparse.Namespace) -> dict: """Merge cube-tracker CLI args into ``env_kwargs``. Returns it for chaining.""" env_kwargs["auto_launch_cube_tracker"] = (args.cube_tracker == "auto") env_kwargs["cube_tracker_camera"] = args.cube_tracker_camera env_kwargs["cube_tracker_target_frame"] = args.cube_tracker_target_frame return env_kwargs
# --------------------------------------------------------------------------- # Wrist-camera CLI plumbing (NED2 sim + real) # ---------------------------------------------------------------------------
[docs] def add_wrist_camera_cli(parser: argparse.ArgumentParser) -> None: """Add ``--wrist-camera`` flag (default off). NED2 envs accept ``use_wrist_camera: bool`` as a kwarg. Default off so there's no extra Gazebo / vision-node load when the user doesn't need it. """ parser.add_argument( "--wrist-camera", action="store_true", default=False, help=( "Enable the Niryo Ned2 wrist camera subscriber. Sim subscribes " "to /gazebo_camera/image_raw (raw); real subscribes to " "/niryo_robot_vision/compressed_video_stream (compressed). " "Decoded frame is exposed as self.cv_image_wrist on the env." ), )
[docs] def apply_wrist_camera_kwargs(env_kwargs: dict, args: argparse.Namespace) -> dict: """Merge ``--wrist-camera`` into ``env_kwargs``. Returns it for chaining.""" env_kwargs["use_wrist_camera"] = bool(args.wrist_camera) return env_kwargs
# --------------------------------------------------------------------------- # Goal-pose CLI plumbing for push real (optional physical AprilTag for the # push target — push only; jitter from hand-held tags would destabilise # reach / pnp policies, where the goal is in the air). # ---------------------------------------------------------------------------
[docs] def add_goal_pose_cli(parser: argparse.ArgumentParser) -> None: """Add ``--goal-pose-topic`` for push real validation. Empty by default → the env keeps using random / hard-coded goals. When set (e.g. ``/goal_pose``), the env subscribes; on each reset the latest pose (if fresh) overrides the random / hard-coded goal. Use with ``--cube-tracker auto`` to also auto-launch the second AprilTag adapter for the goal tag (id 1 by default). """ parser.add_argument( "--goal-pose-topic", default="", help=( "Topic publishing the physical-goal PoseStamped (e.g. /goal_pose). " "Empty (default) leaves the env on its random / hard-coded goal. " "Push real only — reach / pnp use sampled in-air goals where " "hand-held tag jitter destabilises the policy." ), )
[docs] def apply_goal_pose_kwargs(env_kwargs: dict, args: argparse.Namespace) -> dict: """Merge ``--goal-pose-topic`` into ``env_kwargs``. Returns it for chaining.""" env_kwargs["goal_pose_topic"] = str(args.goal_pose_topic) return env_kwargs
# --------------------------------------------------------------------------- # Seed-aware path helpers # ---------------------------------------------------------------------------
[docs] def with_seed_suffix(path: str, seed: int) -> str: """Append ``seed_<N>/`` to a save / log path. Keeps runs trained with different seeds in separate directories so a later run does not silently overwrite (or load from) a previous seed's checkpoints. The trailing slash is preserved when present in the input path. """ if path.endswith("/"): return f"{path}seed_{seed}/" return f"{path}/seed_{seed}/"
# --------------------------------------------------------------------------- # Combined "is this env safe to construct now?" check # ---------------------------------------------------------------------------
[docs] def check_env_constructable(env_id: str, allow_real_flag: bool = False) -> None: """ Raise SystemExit if ``env_id`` is unregistered or a real env without explicit consent. """ if not is_registered(env_id): gym = _import_gym() sample = ", ".join( sorted(s for s in gym.envs.registry if any(s.startswith(p) for p in _ROBOT_PREFIXES))[:6] ) raise SystemExit( f"[rl_training_validation] '{env_id}' is not registered in the " f"Gymnasium registry. Available ids include: {sample} ... " "Run scripts/list_available_envs.py for the full list." ) enforce_real_motion_consent(env_id, allow_real_flag)
# --------------------------------------------------------------------------- # Goal / HER plumbing helpers # ---------------------------------------------------------------------------
[docs] def filter_to_implemented(env_ids: Iterable[str]) -> Tuple[List[str], List[str]]: """Split a list of env ids into ``(registered, missing)``.""" registered, missing = [], [] for eid in env_ids: (registered if is_registered(eid) else missing).append(eid) return registered, missing
[docs] def assert_goal_env(env_id: str) -> None: """Raise SystemExit if ``env_id`` is not a goal-conditioned env.""" if not is_goal_env(env_id): raise SystemExit( f"[rl_training_validation] {env_id} is not a goal-conditioned env. " "HER replay buffers only support goal envs ({...Goal...} ids). " "Use a non-HER algorithm or switch to the Goal variant." )
[docs] def assert_non_goal_env(env_id: str) -> None: """Raise SystemExit if ``env_id`` is a goal-conditioned env.""" if is_goal_env(env_id): raise SystemExit( f"[rl_training_validation] {env_id} is a goal-conditioned env. " "Use a HER-compatible algorithm (e.g. SAC_GOAL / TD3_GOAL) or " "switch to the non-Goal variant." )
# parse_env_id kept for backward-compat with scripts that expect a 4-tuple.
[docs] def parse_env_id(env_id: str): """Return ``(robot, mode, task, is_goal)`` or None. Parses bare ids like ``RX200PushGoalSim-v0`` / ``NED2ReacherReal-v0``. Also accepts the ``Zed2``-flavoured RX200 ids (e.g. ``RX200Zed2PnPGoalSim-v0``); the implicit (no-prefix) kinect variants fall through the empty-prefix branch so callers don't need to special-case the default sensor. Verified to round-trip every currently-registered task id without returning ``None``. """ if not env_id.endswith("-v0"): return None body = env_id[: -len("-v0")] for prefix in sorted(_ROBOT_PREFIXES, key=len, reverse=True): if body.startswith(prefix): robot_lc = {"RX200": "rx200", "NED2": "ned2", "VX300S": "vx300s", "UR5e": "ur5e"}[prefix] rest = body[len(prefix):] break else: return None if rest.endswith("Real"): mode = "real" rest = rest[: -len("Real")] elif rest.endswith("Sim"): mode = "sim" rest = rest[: -len("Sim")] else: return None is_goal = rest.endswith("Goal") if is_goal: rest = rest[: -len("Goal")] # task: "Reacher" (RX200/NED2) or "Push"/"PnP"/... task_map = {"Reacher": "reach", "Push": "push", "PnP": "pnp"} # Also accept sensor-flavoured prefixes like "Zed2Reacher". for sensor_prefix in ("Zed2", ""): if rest.startswith(sensor_prefix): tail = rest[len(sensor_prefix):] if tail in task_map: return robot_lc, mode, task_map[tail], is_goal return None