RealROS API reference

Real-hardware environments built on the shared UniROS proxy and utilities. Same gym API as MultiROS, talks to physical robot drivers (typically via MoveIt) instead of Gazebo.

Top-level package

Gym proxy (re-export)

Re-export of the canonical multiprocessing gym proxy.

The implementation lives in uniros._proxy.GymProxy. RealrosGym is an alias kept for backwards compatibility — both names refer to the same class object, so isinstance(env, RealrosGym) matches envs created through any package in the ecosystem.

Usage:

from realros.core import RealrosGym as gym
env = gym.make("env_name", args)
env.reset()
realros.core.RealrosGym

alias of GymProxy

class realros.core.GymProxy(env_name, *args, **kwargs)[source]

Bases: object

Parent-side proxy for a gym.Env running inside a worker process.

Equivalent aliases re-exported by other packages in the ecosystem:

  • multiros.core.MultirosGym

  • realros.core.RealrosGym

  • uniros.core.uniros_gym

All four names refer to the same class object.

Usage:

from uniros.core import uniros_gym as gym  # or any alias
env = gym.make("env_name", **kwargs)
env.reset()
Parameters:
static worker(env_name, conn, *args, **kwargs)[source]
Parameters:
Return type:

None

property unwrapped: GymProxy

Return self as the unwrapped env.

The proxy IS the user-visible env from the parent’s perspective; whatever lives in the worker process is opaque and not accessible across the pipe (gym.Env instances typically hold non-picklable state such as _thread.RLock from rospy’s spinner threads, so sending them through the pipe raises TypeError).

Without this handler, env.unwrapped would route through __getattr__, IPC to the worker, and fail to pickle. SB3’s DummyVecEnv calls id(env.unwrapped) for uniqueness checking during construction, so this property is required for SB3 compatibility.

classmethod make(env_name, *args, **kwargs)[source]
Parameters:
Return type:

GymProxy

step(action)[source]
Parameters:

action (Any)

Return type:

Tuple[Any, float, bool, bool, Mapping[str, Any]]

reset(seed=None, options=None)[source]
Parameters:
Return type:

Tuple[Any, Mapping[str, Any]]

close()[source]
Return type:

None

The RealrosGym class in this module is an alias for uniros._proxy.GymProxy — see the UniROS API page for the full method reference.

ROS helpers

ROS common

Common helpers for handling ROS from inside the realros framework.

Functions provided:

  • launch_roscore — launch a roscore with a given or random (non-overlapping) port.

  • get_all_the_ros_masters — get all the currently-known rosmaster ports (diagnostic).

  • add_to_rosmaster_list — add a port to the diagnostic port log.

  • remove_from_rosmaster_list — remove a port from the diagnostic port log.

  • kill_all_host_ros_processes — kill ALL ROS processes on this host.

  • kill_all_host_roslaunch_processes — kill ALL roslaunch processes on this host.

  • kill_all_ros_nodes — kill every node attached to the current rosmaster.

  • kill_ros_node — kill a single named ROS node.

  • ros_kill_master — kill a specific rosmaster by port.

  • clean_ros_logs — purge ROS logs.

  • source_workspace — deprecated no-op.

  • ros_launch_launcher — execute a roslaunch with args.

  • ros_node_launcher — launch a ROS node from a package.

  • ros_load_yaml — fetch and load a YAML file onto the ROS parameter server.

  • load_urdf — load a URDF onto the parameter server.

  • is_roscore_running — check whether a roscore is reachable.

  • change_ros_master — set ROS_MASTER_URI for this process.

  • change_ros_master_multi_device — set ROS_MASTER_URI for cross-machine use.

  • init_robot_state_publisher — start the robot_state_publisher node.

  • remove_all_from_rosmaster_list — truncate the diagnostic port log.

realros.utils.ros_common.register_managed_process(popen, **selectors)[source]

Register a process spawned by this script for automatic cleanup on Ctrl+C or normal interpreter exit.

Parameters:
  • popen – A subprocess.Popen (typically the xterm wrapper shell). .terminate() will be called on cleanup.

  • **selectors – Optional fallback identifiers used by cleanup when terminating the Popen alone is not enough. Recognised keys are roscore_port (str | int), which triggers pkill -f "roscore -p <port>"; and kind (str), a free-form label used in log lines.

Return type:

None

realros.utils.ros_common.launch_roscore(port=None, set_new_master_vars=True, default_port=False)[source]

Launch a roscore on a free port and (optionally) point this process’s ROS_MASTER_URI at it.

Ports are allocated via socket.bind(('127.0.0.1', 0)) so the kernel picks free ports and serializes between parallel callers.

Parameters:
  • port (int) – A specific desired port for ROS_MASTER_URI. If the port is unavailable on this host right now, falls back to a kernel-allocated free port and logs a warning.

  • set_new_master_vars (bool) – change the current ROS_MASTER_URI environment variable to the selected port.

  • default_port (bool) – If True, request port 11311. If 11311 is already in use, ROS_MASTER_URI is still pointed at it so the caller can attach to an existing roscore.

Returns:

ROS_MASTER_URI port as a string.

Return type:

str

realros.utils.ros_common.get_all_the_ros_masters()[source]

DEPRECATED. Reads the diagnostic port log only.

Port allocation is handled by launch_roscore via socket.bind(0); this helper returns whatever the best-effort diagnostic log contains.

Return type:

str

realros.utils.ros_common.add_to_rosmaster_list(ros_port)[source]

DEPRECATED. Appends to the diagnostic port log only.

Port allocation is now handled by launch_roscore directly via socket.bind(0). Callers do not need to maintain a shared list.

Parameters:

ros_port (str)

Return type:

bool

realros.utils.ros_common.remove_from_rosmaster_list(ros_port)[source]

DEPRECATED. Removes ros_port from the diagnostic log only.

Port allocation is now handled by launch_roscore directly via socket.bind(0). Callers do not need to maintain a shared list.

Parameters:

ros_port (str)

Return type:

bool

realros.utils.ros_common.kill_all_host_ros_processes()[source]

Kill EVERY rosmaster/roslaunch/rosout/robot_state_publisher/nodelet process on this host (not just the ones this script spawned).

Implemented via killall -9 so it is host-scoped: if another user or another script on the same machine is running ROS, this will kill their processes too. Use with care.

Renamed from kill_all_ros_processes to make the host-wide scope explicit. The old name is kept as a DeprecationWarning-emitting alias for backwards compatibility.

Returns:

True if the kill command ran (does not verify each

process actually died).

Return type:

bool

realros.utils.ros_common.kill_all_ros_processes()[source]

DEPRECATED. Use kill_all_host_ros_processes() instead.

The old name was misleading: it sounds like it cleans up only the instances this script spawned, but it actually issues killall -9 against rosmaster/roslaunch/rosout/etc. host-wide.

Return type:

bool

realros.utils.ros_common.kill_all_host_roslaunch_processes()[source]

Kill EVERY roslaunch process on this host via killall -9.

Host-scoped: not limited to this script’s rosmaster. Renamed from kill_all_roslaunch_process to make the host-wide scope explicit. The old name is kept as a DeprecationWarning-emitting alias for backwards compatibility.

Returns:

True if the kill command ran.

Return type:

bool

realros.utils.ros_common.kill_all_roslaunch_process()[source]

DEPRECATED. Use kill_all_host_roslaunch_processes() instead.

Return type:

bool

realros.utils.ros_common.kill_all_ros_nodes(ros_port=None)[source]

Function to kill all running ROS nodes of the specified or current Rosmaster.

Parameters:

ros_port (str) – The ROS_MASTER_URI port (optional).

Returns:

True if all nodes were killed, False otherwise.

Return type:

bool

realros.utils.ros_common.kill_ros_node(node_name, ros_port=None)[source]

Function to kill a ROS node of the given or current Rosmaster.

Parameters:
  • node_name (str) – Name of the node to kill.

  • ros_port (str) – The ROS_MASTER_URI port (optional).

Returns:

True if the node was killed, False otherwise.

Return type:

bool

realros.utils.ros_common.ros_kill_master(ros_port)[source]

Function to kill a ROS master.

Parameters:

ros_port (str) – The ROS_MASTER_URI port.

Returns:

True if the master was killed, False otherwise.

Return type:

bool

realros.utils.ros_common.clean_ros_logs()[source]

Function to clean all the ros logs.

Returns:

True if all the ros logs were closed.

Return type:

bool

realros.utils.ros_common.source_workspace(abs_path)[source]

DEPRECATED no-op.

This function used to call os.system("source devel/setup.bash") against the workspace path. That cannot work for two reasons:

  1. os.system runs the command in /bin/sh (commonly dash on Ubuntu), where source is not a builtin — only . (“dot-source”) is. So the source call fails immediately with “sh: 1: source: not found”.

  2. Even if it succeeded, sourcing in a child shell only mutates the child’s environment. The child dies with the subprocess and the calling Python process is unaffected.

To use a ROS workspace from a Python process, source it in the shell that launches Python (i.e. source devel/setup.bash in the terminal, then run python).

This function is preserved for backwards compatibility but is now a no-op that emits a DeprecationWarning and a rospy logwarn. It will be removed in a future release.

Parameters:

abs_path (str) – Unused. Kept for signature compatibility.

Returns:

Always False.

Return type:

bool

realros.utils.ros_common.ros_launch_launcher(pkg_name=None, launch_file_name=None, launch_file_abs_path=None, args=None, launch_new_term=True, ros_port=None)[source]

Function to execute a roslaunch with args.

Parameters:
  • pkg_name (str) – Name of the package where the launch file is located.

  • launch_file_name (str) – Name of the launch file.

  • launch_file_abs_path (str) – Absolute path of the launch file

  • args (list of str) – Args to pass to the launch file.

  • launch_new_term (bool) – Launch the process in a new terminal (Xterm).

  • ros_port (str) – The ROS_MASTER_URI port (optional).

Returns:

True if the launch file was launched and False otherwise.

Return type:

bool

realros.utils.ros_common.construct_roslaunch_command(pkg_name, launch_file_name, launch_file_abs_path)[source]

Constructs a roslaunch command using either a package name and launch file name or an absolute path to a launch file.

Parameters:
  • pkg_name (str) – Name of the package where the launch file is located.

  • launch_file_name (str) – Name of the launch file.

  • launch_file_abs_path (str) – Absolute path of the launch file

Returns:

The constructed roslaunch command or None if no valid inputs were provided.

Return type:

str

realros.utils.ros_common.ros_node_launcher(pkg_name, node_name, launch_master=False, launch_new_term=True, name=None, ns='/', output='log', ros_port=None, args=None)[source]

Function to launch a ROS node from a package. If “launch_master” is “True”, it will also launch a ROSCORE with the given “ros_port” or with a random ROS master port if “ros_port” is not specified.

Parameters:
  • pkg_name (str) – Name of the package to launch the node from.

  • node_name (str) – Name of the node to launch.

  • launch_master (bool) – If ROSMASTER is not running launch it.

  • launch_new_term (bool) – Launch the process in a new terminal (Xterm).

  • name (str) – Name to give the node to be launched.

  • ns (str) – Namespace to give the node to be launched.

  • output (str) – log, screen, or None

  • ros_port (str) – The ROS_MASTER_URI port (optional).

  • args (List[str]) – List of additional arguments to pass to the rosrun command.

Returns:

ROS Master port, and True if the node was launched successfully,

False otherwise.

Return type:

tuple[str, bool]

realros.utils.ros_common.construct_rosrun_command(pkg_name, node_name, name=None, ns='/', output='log')[source]

Constructs a rosrun command using a package name and node name.

Parameters:
  • pkg_name (str) – Name of the package to launch the node from.

  • node_name (str) – Name of the node to launch.

  • name (str) – Name to give the node to be launched.

  • ns (str) – Namespace to give the node to be launched.

  • output (str) – log, screen, or None

Returns:

The constructed rosrun command.

Return type:

str

realros.utils.ros_common.check_package_exists(pkg_name)[source]

Checks if a given package exists.

Parameters:

pkg_name (str) – Name of the package.

Returns:

True if the package exists and False otherwise.

Return type:

bool

realros.utils.ros_common.ros_load_yaml(pkg_name=None, file_name=None, file_abs_path=None, ns='/', ros_port=None)[source]

Fetch a YAML file from a package or an abs path and load it into the ROS Parameter Server.

Parameters:
  • pkg_name (str) – name of package.

  • file_name (str) – name of file.

  • file_abs_path (str) – Absolute path of the YAML file.

  • ns (str) – namespace to load parameters into.

  • ros_port (str) – The ROS_MASTER_URI port (optional).

Returns:

True if file was loaded, false otherwise.

Return type:

bool

realros.utils.ros_common.load_urdf(model_path=None, pkg_name=None, file_name=None, folder='/urdf', ns=None, args_xacro=None, param_name=None, ros_port=None)[source]

Function to load a URDF from a ROS package to the parameter server or a string containing the processed URDF data.

Parameters:
  • model_path (str) – The path to the URDF file. Defaults to None.

  • pkg_name (str) – The name of the ROS package containing the URDF file. Defaults to None.

  • file_name (str) – The name of the URDF file. Defaults to None.

  • folder (str) – The folder within the ROS package containing the URDF file. Defaults to “/urdf”.

  • ns (str) – The namespace to use when adding the URDF to the parameter server. Defaults to None.

  • args_xacro (list) – Additional arguments to pass to xacro when processing the URDF file. Defaults to None.

  • param_name (str) – The name of the parameter to use when adding the URDF to the parameter server.

  • ros_port (str) – The ROS_MASTER_URI port (optional). Defaults to None.

Returns:

A tuple containing a boolean value indicating whether the loading was

successful and a string containing the processed URDF data. If an error occurred while loading or processing the URDF file, the first element of the tuple will be False and the second element will be None.

Return type:

Tuple[bool, Union[str, None]]

realros.utils.ros_common.is_roscore_running()[source]

Function to check if a roscore is currently running.

Returns:

True if a roscore is running, False otherwise.

Return type:

bool

realros.utils.ros_common.change_ros_master(ros_port)[source]

Function to set the current ROS Master Environment Variable.

Parameters:

ros_port (str) – The ROS_MASTER_URI port.

Returns:

True if ROS Master Environment Variables are set to new values.

Return type:

bool

realros.utils.ros_common.change_ros_master_multi_device(remote_ip, local_ip, remote_ros_port='11311')[source]

Function to set the current ROS Master Environment Variable for multi-device communication.

Parameters:
  • remote_ip (str) – The remote IP address.

  • local_ip (str) – The local IP address.

  • remote_ros_port (str) – The remote ROS_MASTER_URI port. Defaults ros port 11311.

Returns:

True if ROS Master Environment Variables are set to new values.

Return type:

bool

realros.utils.ros_common.init_robot_state_publisher(ns='/', max_pub_freq=None, launch_new_term=False)[source]

Function to initialize the robot state publisher.

Parameters:
  • ns (str) – Namespace to give the node to be launched.

  • max_pub_freq (float) – Maximum frequency at which to publish the robot state.

  • launch_new_term (bool) – Launch the process in a new terminal (Xterm).

Returns:

True if the node was launched successfully, False otherwise.

Return type:

bool

realros.utils.ros_common.remove_all_from_rosmaster_list()[source]

DEPRECATED. Truncates the diagnostic port log only.

Port allocation is now handled by launch_roscore directly via socket.bind(0). Callers do not need to maintain a shared list.

Return type:

bool

ROS controllers (re-export)

Re-export of the canonical ROS-controller helpers from uniros.utils.ros_controllers.

Each helper wraps a controller_manager service call (load, unload, list, switch, start, stop, reset, spawn, unspawn) with a 30-second rospy.wait_for_service timeout so a hung controller_manager surfaces as a logged failure rather than an indefinite hang.

Usage:

from realros.utils.ros_controllers import load_ros_controller, ...

Re-export of uniros.utils.ros_controllers. See the UniROS API page for the full helper reference.

ROS markers (re-export)

Re-export of the canonical ROS marker helpers from uniros.utils.ros_markers.

Usage:

from realros.utils.ros_markers import RosMarker, RosMarkerArray

Re-export of uniros.utils.ros_markers.

ROS kinematics (re-export)

Re-export of the canonical kinematics helpers from uniros.utils.ros_kinematics.

Usage:

from realros.utils.ros_kinematics import Kinematics_pyrobot, Kinematics_pykdl

Re-export of uniros.utils.ros_kinematics.

MoveIt integration

This Class is to specify all the common functions related to the handling of ROS Moveit for manipulation tasks.

It has the following methods (Main functions),

01. set_trajectory_ee: Set a pose target for the end effector of the robot arm. 02. set_trajectory_joints: Set a joint position target for the arm joints. 03. set_gripper_joints: Set a joint position target for the gripper joints. 04. get_robot_pose: Get the current pose of the robot arm. 05. get_robot_rpy: Get the current roll, pitch, and yaw angles of the robot arm. 06. get_gripper_pose: Get the current pose of the gripper. 07. get_gripper_rpy: Get the current roll, pitch, and yaw angles of the gripper. 08. get_joint_angles_robot_arm: Get the current joint angles of the robot arm. 09. get_joint_angles_gripper: Get the current joint angles of the gripper. 10. get_joint_angles: Get the current joint angles of both the robot arm and gripper. 11. check_goal: Check if a goal position is reachable by the robot arm. 12. get_randomJointVals: Get random joint values for the robot arm. 13. get_randomPose: Get a random pose for the robot arm. 14. set_planning_time: Set the maximum time allowed for planning a trajectory. 15. set_goal_position_tolerance: Set the goal position tolerance for the robot arm. 16. set_goal_orientation_tolerance: Set the goal orientation tolerance for the robot arm. 17. set_goal_joint_tolerance: Set the goal joint tolerance for the robot arm. 18. set_max_acceleration_scaling_factor: Set the maximum acceleration scaling factor for the robot arm. 19. set_max_velocity_scaling_factor: Set the maximum velocity scaling factor for the robot arm. 20. set_trajectory_cartesian: Set a cartesian trajectory for the end effector of the robot arm. 21. stop_arm: Stop the robot arm from moving. 22. stop_gripper: Stop the gripper from moving. 23. check_goal_joint_pos: Check if a goal position is reachable by the robot arm.

class realros.utils.moveit_realros.MoveitRealROS(arm_name, gripper_name=None, robot_description=None, ns=None)[source]

Bases: object

A class for using MoveIt!

This class provides a set of methods for controlling a robot arm and gripper using MoveIt!. It allows you to set joint angles, execute trajectories, and check if a goal is reachable. It also provides methods for getting the current pose and joint angles of the robot arm and gripper.

To use this class, you must first initialize it with the appropriate parameters for your robot arm and gripper. You can then call the various methods provided by this class to control your robot.

Parameters:
  • arm_name (str)

  • gripper_name (str | None)

  • robot_description (str | None)

  • ns (str | None)

arm_execute_pose(pose, async_move=False)[source]

Execute a pose with the robot arm.

Parameters:
  • pose (Pose) – The target pose for the robot arm.

  • async_move (bool) – Whether to execute the trajectory asynchronously or not (Optional).

Returns:

The result of the trajectory execution.

Return type:

bool

arm_execute_joint_trajectory(joint_target_values, async_move=False)[source]

Execute a joint trajectory with the robot arm.

Parameters:
  • joint_target_values (List[float]) – The target joint values for the robot arm.

  • async_move (bool) – Whether to execute the trajectory asynchronously or not (Optional).

Returns:

The result of the trajectory execution.

Return type:

bool

Raises:

ValueError – If the input joint target values do not match the number of joints of the robot arm.

gripper_execute_joint_command(target_joint_values, async_move=False)[source]

Execute a joint command with the gripper.

Parameters:
  • target_joint_values (List[float]) – The target joint values for the gripper.

  • async_move (bool) – Whether to execute the trajectory asynchronously or not (Optional).

Returns:

The result of the command execution.

Return type:

bool

Raises:

ValueError – If the input joint target values do not match the number of joints of the gripper.

arm_execute_trajectory(async_move=False)[source]

Execute a trajectory with the robot arm.

Returns:

The result of the trajectory execution.

Return type:

bool

Parameters:

async_move (bool)

stop_arm()[source]

Stop the robot arm from moving.

Return type:

None

stop_gripper()[source]

Stop the gripper from moving.

Return type:

None

robot_pose()[source]

Get the current pose of the robot arm.

Returns:

The current pose of the robot arm.

Return type:

PoseStamped

robot_rpy()[source]

Get the current roll, pitch, and yaw angles of the robot arm.

Returns:

The current roll, pitch, and yaw angles of the robot arm.

Return type:

List[float]

gripper_pose(gripper_link='')[source]

Get the current pose of the gripper.

Parameters:

gripper_link (str) – The name of the gripper link to get the pose of.

Returns:

The current pose of the gripper.

Return type:

PoseStamped

gripper_rpy(gripper_link='')[source]

Get the current roll, pitch, and yaw angles of the gripper.

Parameters:

gripper_link (str) – The name of the gripper link to get the angles of.

Returns:

The current roll, pitch, and yaw angles of the gripper.

Return type:

List[float]

joint_angles_arm()[source]

Get the current joint angles of the robot arm.

Returns:

The current joint angles of the robot arm.

Return type:

List[float]

joint_angles_gripper()[source]

Get the current joint angles of the gripper.

Returns:

The current joint angles of the gripper.

Return type:

List[float]

joint_angles()[source]

Get the current joint angles of both the robot arm and gripper.

Returns:

The current joint angles of both the robot arm and gripper.

Return type:

List[float]

is_goal_reachable(goal)[source]

Check if a goal position is reachable by the robot arm.

Parameters:

goal (Union[List[float], np.ndarray]) – The target position for the robot arm.

Returns:

Whether the goal position is reachable or not.

Return type:

bool

check_goal_reachable_joint_pos(joint_pos)[source]

Check if a goal position is reachable by the robot arm.

Parameters:

joint_pos (Union[List[float], np.ndarray]) – joint positions.

Returns:

Whether the goal position is reachable or not.

Return type:

bool

set_trajectory_ee(position, orientation=None, async_move=False)[source]

Set a pose target for the end effector of the robot arm.

Parameters:
  • position (Union[List[float], np.ndarray]) – The target position for the end effector.

  • orientation (Union[List[float], np.ndarray]) – The target orientation for the end effector (Optional).

  • async_move (bool) – Whether to execute the trajectory asynchronously or not (Optional).

Returns:

The result of the trajectory execution.

Return type:

bool

set_trajectory_joints(q_positions, async_move=False)[source]

Set a joint position target for the arm joints.

Parameters:
  • q_positions (Union[List[float], np.ndarray]) – The target joint positions for the robot arm.

  • async_move (bool) – Whether to execute the trajectory asynchronously or not (Optional).

Returns:

The result of the trajectory execution.

Return type:

bool

set_gripper_joints(joint_positions, async_move=False)[source]

Set a joint position target for the gripper joints.

Parameters:
  • joint_positions (Union[List[float], np.ndarray]) – The target joint positions for the gripper.

  • async_move (bool) – Whether to execute the trajectory asynchronously or not (Optional).

Returns:

The result of the command execution.

Return type:

bool

set_trajectory_cartesian(waypoints, eef_step=0.01, jump_threshold=0.0, avoid_collisions=True)[source]

Set a cartesian trajectory for the end effector of the robot arm.

Parameters:
  • waypoints (List[PoseStamped]) – The target waypoints for the end effector.

  • eef_step (float) – The distance between waypoints in meters (Optional).

  • jump_threshold (float) – The maximum distance in the configuration space between consecutive points in the resulting path (Optional).

  • avoid_collisions (bool) – Whether to check for collisions between waypoints or not (Optional).

Returns:

The result of the trajectory execution.

Return type:

bool

get_robot_pose()[source]

Get the current pose of the robot arm.

Returns:

The current pose of the robot arm.

Return type:

PoseStamped

get_robot_rpy()[source]

Get the current roll, pitch, and yaw angles of the robot arm.

Returns:

The current roll, pitch, and yaw angles of the robot arm.

Return type:

List[float]

get_gripper_pose(link='')[source]

Get the current pose of the gripper.

Parameters:

link (str) – The name of the gripper link to get the pose of.

Returns:

The current pose of the gripper.

Return type:

PoseStamped

get_gripper_rpy(link='')[source]

Get the current roll, pitch, and yaw angles of the gripper.

Parameters:

link (str) – The name of the gripper link to get the angles of.

Returns:

The current roll, pitch, and yaw angles of the gripper.

Return type:

List[float]

get_joint_angles_robot_arm()[source]

Get the current joint angles of the robot arm.

Returns:

The current joint angles of the robot arm.

Return type:

List[float]

get_joint_angles_gripper()[source]

Get the current joint angles of the gripper.

Returns:

The current joint angles of the gripper.

Return type:

List[float]

get_joint_angles()[source]

Get the current joint angles of both the robot arm and gripper.

Returns:

The current joint angles of both the robot arm and gripper.

Return type:

List[float]

check_goal(goal)[source]

Check if a goal position is reachable by the robot arm.

Parameters:

goal (Union[List[float], np.ndarray]) – The target position for the robot arm.

Returns:

Whether the goal position is reachable or not.

Return type:

bool

check_goal_joint_pos(joint_pos)[source]

Check if a goal position is reachable by the robot arm.

Parameters:

joint_pos (Union[List[float], np.ndarray]) – The target position for the robot arm.

Returns:

Whether the goal position is reachable or not.

Return type:

bool

get_randomJointVals()[source]

Get random joint values for the robot arm.

Returns:

Random joint values for the robot arm.

Return type:

List[float]

get_randomPose()[source]

Get a random pose for the robot arm.

Returns:

A random pose for the robot arm.

Return type:

PoseStamped

set_planning_time(planning_time)[source]

Set the maximum allowed planning time. Specify the amount of time to be used for motion planning

Parameters:

planning_time (float) – The maximum allowed planning time in seconds.

Return type:

None

set_goal_position_tolerance(position_tolerance)[source]

Set the tolerance for a target end-effector position

Parameters:

position_tolerance (float) – The tolerance for the goal position in meters.

Return type:

None

set_goal_orientation_tolerance(orientation_tolerance)[source]

Set the tolerance for a target end-effector orientation.

Parameters:

orientation_tolerance (float) – The tolerance for the goal orientation in radians.

Return type:

None

set_goal_joint_tolerance(joint_tolerance)[source]

Set the tolerance for a target joint configuration.

Parameters:

joint_tolerance (float) – The tolerance for the goal joint configuration in radians.

Return type:

None

set_max_acceleration_scaling_factor(acceleration_scaling_factor)[source]

Set a scaling factor to reduce the maximum joint accelerations. Allowed values are in (0,1]. The default value is set in the joint_limits.yaml of the moveit_config package.

Parameters:

acceleration_scaling_factor (float) – The maximum acceleration scaling factor.

Return type:

None

set_max_velocity_scaling_factor(velocity_scaling_factor)[source]

Set a scaling factor to reduce the maximum joint velocities. Allowed values are in (0,1]. The default value is set in the joint_limits.yaml of the moveit_config package.

Parameters:

velocity_scaling_factor (float) – The maximum velocity scaling factor.

Return type:

None

Base envs

RealBaseEnv

class realros.envs.RealBaseEnv.RealBaseEnv(load_robot=True, robot_pkg_name=None, robot_launch_file=None, robot_args=None, load_urdf=False, urdf_pkg_name=None, urdf_file_name=None, urdf_folder='/urdf', urdf_xacro_args=None, namespace='/', launch_robot_state_pub=False, robot_state_publisher_max_freq=None, new_robot_state_term=False, load_controllers=False, controllers_file=None, controllers_list=None, reset_controllers=False, reset_controllers_prompt=False, kill_rosmaster=True, clean_logs=False, ros_port=None, seed=None, reset_env_prompt=False, close_env_prompt=False, action_cycle_time=0.0, log_internal_state=False, multi_device_mode=False, remote_ip=None, local_ip=None)[source]

Bases: Env

A custom gymnasium environment for reinforcement learning using ROS and real robots.

Initialize the RealBaseEnv.

Parameters:
  • load_robot (bool) – Load the real robot using a launch file.

  • robot_pkg_name (str) – The name of the package containing the real robot launch file.

  • robot_launch_file (str) – The name of the robot launch file.

  • robot_args (list of str) – Args to pass to the robot launch file.

  • load_urdf (bool) – Whether to load the URDF of the real robot.

  • urdf_pkg_name (str) – The name of the package containing the URDF file.

  • urdf_file_name (str) – The name of the URDF file.

  • urdf_folder (str) – The folder containing the URDF file.

  • urdf_xacro_args (List[str]) – A list of arguments to pass to xacro when processing the URDF file.

  • namespace (str) – The ROS namespace to use.

  • launch_robot_state_pub (bool) – Whether to launch the robot state publisher.

  • robot_state_publisher_max_freq (float) – The maximum frequency at which to publish robot state messages.

  • new_robot_state_term (bool) – Whether to launch the robot state publisher in a new terminal.

  • load_controllers (bool) – Whether to load controllers for the real robot.

  • controllers_file (str) – The name of the file containing the controller configuration.

  • controllers_list (List[str]) – A list of controllers to spawn.

  • reset_controllers (bool) – Whether to reset the controllers when resetting the environment.

  • reset_controllers_prompt (bool) – Whether to prompt the user before resetting the controllers.

  • kill_rosmaster (bool) – Whether to kill the ROS master when closing the environment.

  • clean_logs (bool) – Whether to clean the ROS logs when closing the environment.

  • ros_port (str) – The port number to use for the ROS master.

  • seed (int) – The seed for the random number generator.

  • reset_env_prompt (bool) – Whether to prompt the user to resetting the environment.

  • close_env_prompt (bool) – Whether to prompt the user to closing the environment.

  • action_cycle_time (float) – The time to wait between applying actions.

  • log_internal_state (bool) – Whether to log the internal state of the environment.

  • multi_device_mode (bool) – Whether to run the environment in multi-device mode.

  • remote_ip (str) – The IP address of the remote ROS master (if multi-device mode is enabled).

  • local_ip (str) – The IP address of the local PC (if multi-device mode is enabled).

close_env_prompt

Load the real robot using a launch file

step(action)[source]

Take a step in the environment.

Parameters:

action (Any) – The action to be applied to the robot.

Returns:

The observation representing the current state of the environment. reward (float): The reward for taking the given action. terminated (bool): Whether the agent reaches the terminal state. truncated (bool): Whether the episode is truncated due to various reasons. (e.g. reaching the maximum number of steps, or end before the terminal state) info (dict): Additional information about the environment.

Return type:

observation (Any)

reset(seed=None, options=None)[source]

Reset the environment.

Parameters:
  • seed (int) – The seed for the random number generator.

  • options (dict) – Additional information for resetting the environment.

Returns:

The initial observation representing the state of the environment. info (dict): Additional information about the environment. Similar to the info returned by step().

Return type:

observation (Any)

close()[source]

Close the environment.

Return type:

None

RealGoalEnv

class realros.envs.RealGoalEnv.RealGoalEnv(load_robot=True, robot_pkg_name=None, robot_launch_file=None, robot_args=None, load_urdf=False, urdf_pkg_name=None, urdf_file_name=None, urdf_folder='/urdf', urdf_xacro_args=None, namespace='/', launch_robot_state_pub=False, robot_state_publisher_max_freq=None, new_robot_state_term=False, load_controllers=False, controllers_file=None, controllers_list=None, reset_controllers=False, reset_controllers_prompt=False, kill_rosmaster=True, clean_logs=False, ros_port=None, seed=None, reset_env_prompt=False, close_env_prompt=False, action_cycle_time=0.0, log_internal_state=False, multi_device_mode=False, remote_ip=None, local_ip=None)[source]

Bases: GoalEnv

A custom gymnasium robotics goal-conditioned environment for reinforcement learning using ROS and real robots.

Initialize the RealGoalEnv environment.

Parameters:
  • load_robot (bool) – Load the real robot using a launch file.

  • robot_pkg_name (str) – The name of the package containing the real robot launch file.

  • robot_launch_file (str) – The name of the robot launch file.

  • robot_args (list of str) – Args to pass to the robot launch file.

  • load_urdf (bool) – Whether to load the URDF of the real robot.

  • urdf_pkg_name (str) – The name of the package containing the URDF file.

  • urdf_file_name (str) – The name of the URDF file.

  • urdf_folder (str) – The folder containing the URDF file.

  • urdf_xacro_args (List[str]) – A list of arguments to pass to xacro when processing the URDF file.

  • namespace (str) – The ROS namespace to use.

  • launch_robot_state_pub (bool) – Whether to launch the robot state publisher.

  • robot_state_publisher_max_freq (float) – The maximum frequency at which to publish robot state messages.

  • new_robot_state_term (bool) – Whether to launch the robot state publisher in a new terminal.

  • load_controllers (bool) – Whether to load controllers for the real robot.

  • controllers_file (str) – The name of the file containing the controller configuration.

  • controllers_list (List[str]) – A list of controllers to spawn.

  • reset_controllers (bool) – Whether to reset the controllers when resetting the environment.

  • reset_controllers_prompt (bool) – Whether to prompt the user before resetting the controllers.

  • kill_rosmaster (bool) – Whether to kill the ROS master when closing the environment.

  • clean_logs (bool) – Whether to clean the ROS logs when closing the environment.

  • ros_port (str) – The port number to use for the ROS master.

  • seed (int) – The seed for the random number generator.

  • reset_env_prompt (bool) – Whether to prompt the user to resetting the environment.

  • close_env_prompt (bool) – Whether to prompt the user to closing the environment.

  • action_cycle_time (float) – The time to wait between applying actions.

  • log_internal_state (bool) – Whether to log the internal state of the environment.

  • multi_device_mode (bool) – Whether to run the environment in multi-device mode.

  • remote_ip (str) – The IP address of the remote machine in multi-device mode.

  • local_ip (str) – The IP address of the local machine in multi-device mode.

close_env_prompt

Load the real robot using a launch file

step(action)[source]

Take a step in the environment.

Parameters:

action (Any) – The action to be applied to the robot.

Returns:

A dictionary containing the observation, achieved goal, and desired goal. reward (float): The reward for taking the given action. terminated (bool): Whether the agent reaches the terminal state. truncated (bool): Whether the episode is truncated due to various reasons. (e.g. reaching the maximum number of steps, or end before the terminal state) info (dict): Additional information about the environment.

Return type:

observation (dict)

reset(seed=None, options=None)[source]

Reset the environment.

Parameters:
  • seed (int) – The seed for the random number generator.

  • options (dict) – Additional options for resetting the environment.

Returns:

A dictionary containing the initial observation, achieved goal, and desired goal. info (dict): Additional information about the environment. Similar to the info returned by step().

Return type:

observation (dict)

close()[source]

Close the environment.

Return type:

None

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated due to reaching a terminal state.

This method should be implemented by subclasses to return a boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered).

Parameters:
  • achieved_goal (Any) – The achieved goal representing the current state of the environment.

  • desired_goal (Any) – The desired goal representing the target state of the environment.

  • info (dict) – Additional information for computing the termination condition.

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated due non-terminal reasons.

This method should be implemented by subclasses to return a boolean value indicating whether the episode has been truncated due to reasons other than reaching a terminal state. Truncated states are those that are out of the scope of the Markov Decision Process (MDP). This could include truncation due to reaching a maximum number of steps, or any other non-terminal condition that causes the episode to end early.

Parameters:
  • achieved_goal (Any) – The achieved goal representing the current state of the environment.

  • desired_goal (Any) – The desired goal representing the target state of the environment.

  • info (dict) – Additional information for computing the truncation condition.

Returns:

A boolean value indicating whether the episode has been truncated.

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

This method should be implemented by subclasses to return a scalar reward value representing how well the agent is doing in the current episode. The reward could be based on the distance to a goal, the amount of time taken to reach a goal, or any other metric that can be used to measure how well the agent is doing.

Parameters:
  • achieved_goal (Any) – The achieved goal representing the current state of the environment.

  • desired_goal (Any) – The desired goal representing the target state of the environment.

  • info (dict) – Additional information about the environment.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

Wrappers

class realros.wrappers.normalize_action_wrapper.NormalizeActionWrapper(env)[source]

Bases: Wrapper

A wrapper for normalizing the action space of an environment.

This wrapper normalizes the actions to be between -1.0 and 1.0. It can be used with environments whose action space is of type Box.

Parameters:

env (gym.Env) – The environment to wrap.

Raises:

ValueError – If the action space of the environment is not of type Box.

Wraps an environment to allow a modular transformation of the step() and reset() methods.

Parameters:

env (Env) – The environment to wrap

denormalize_action(action)[source]

Normalize the action to the range of the original action space.

Parameters:

action (np.ndarray) – The action to normalize.

Returns:

The normalized action.

Return type:

np.ndarray

reverse_action(action)[source]

Reverse the normalization of an action.

Parameters:

action (np.ndarray) – The action to denormalize.

Returns:

The denormalized action.

Return type:

np.ndarray

step(action)[source]

Take a step in the environment using a normalized action.

Parameters:

action (np.ndarray) – The normalized action to take.

Returns:

The observation representing the current state of the environment. reward (float): The reward for taking the given action. done (bool): Whether the episode has ended. info (dict): Additional information about the environment.

Return type:

observation (Any)

class realros.wrappers.normalize_obs_wrapper.NormalizeObservationWrapper(env, normalize_goal_spaces=False)[source]

Bases: ObservationWrapper

A wrapper for normalizing the observation space of an environment.

This wrapper normalizes the observations to be between -1 and 1. It can handle environments whose observation space is either a Box or a dictionary with keys for ‘observation’, ‘achieved_goal’, and ‘desired_goal’.

Parameters:
  • env (gym.Env) – The environment to wrap.

  • normalize_goal_spaces (bool) – Whether to normalize the achieved_goal and desired_goal spaces.

Raises:

ValueError – If the observation space of the environment is not supported.

Constructor for the observation wrapper.

observation(observation)[source]

Returns a modified observation.

Parameters:

observation (ndarray | Dict[str, ndarray]) – The env observation

Returns:

The modified observation

Return type:

ndarray | Dict[str, ndarray]

class realros.wrappers.time_limit_wrapper.TimeLimitWrapper(env, max_episode_steps)[source]

Bases: Wrapper, RecordConstructorArgs

This wrapper will issue a truncated signal if a maximum number of timesteps is exceeded.

If a truncation is not defined inside the environment itself, this is the only place that the truncation signal is issued. Critically, this is different from the terminated signal that originates from the underlying environment as part of the MDP.

Initializes the TimeLimitWrapper with an environment and the number of steps after which truncation will occur.

Parameters:
  • env (Env) – The environment to apply the wrapper

  • max_episode_steps (int) – An optional max episode steps (if None, env.spec.max_episode_steps is used)

step(action)[source]

Steps through the environment and if the number of steps elapsed exceeds max_episode_steps then truncate.

Parameters:

action (Any) – The environment step action

Returns:

The environment step (observation, reward, terminated, truncated, info) with truncated=True if the number of steps elapsed >= max episode steps

Return type:

Tuple[Any, float, bool, bool, Dict[str, Any]]

reset(**kwargs)[source]

Reset the environment and zero the elapsed-step counter.

Parameters:

**kwargs (Any) – forwarded to the wrapped environment’s reset.

Returns:

The reset environment’s initial observation (and info, on Gymnasium API envs).

Return type:

Tuple[Any, Dict[str, Any]]

property spec: EnvSpec | None

Modifies the environment spec to include the max_episode_steps=self._max_episode_steps.

realros.wrappers.time_limit_wrapper.get_env_params(env)[source]
Parameters:

env (Env)

Return type:

Dict[str, int]