RealROS API reference
Real-hardware environments built on the shared UniROS proxy and utilities. Same gym API as MultiROS, talks to physical robot drivers (typically via MoveIt) instead of Gazebo.
Top-level package
Gym proxy (re-export)
Re-export of the canonical multiprocessing gym proxy.
The implementation lives in uniros._proxy.GymProxy.
RealrosGym is an alias kept for backwards compatibility — both
names refer to the same class object, so isinstance(env,
RealrosGym) matches envs created through any package in the
ecosystem.
Usage:
from realros.core import RealrosGym as gym
env = gym.make("env_name", args)
env.reset()
- realros.core.RealrosGym
alias of
GymProxy
- class realros.core.GymProxy(env_name, *args, **kwargs)[source]
Bases:
objectParent-side proxy for a
gym.Envrunning inside a worker process.Equivalent aliases re-exported by other packages in the ecosystem:
multiros.core.MultirosGymrealros.core.RealrosGymuniros.core.uniros_gym
All four names refer to the same class object.
Usage:
from uniros.core import uniros_gym as gym # or any alias env = gym.make("env_name", **kwargs) env.reset()
- static worker(env_name, conn, *args, **kwargs)[source]
- Parameters:
env_name (str)
conn (Connection)
args (Any)
kwargs (Any)
- Return type:
None
- property unwrapped: GymProxy
Return self as the unwrapped env.
The proxy IS the user-visible env from the parent’s perspective; whatever lives in the worker process is opaque and not accessible across the pipe (gym.Env instances typically hold non-picklable state such as _thread.RLock from rospy’s spinner threads, so sending them through the pipe raises TypeError).
Without this handler, env.unwrapped would route through __getattr__, IPC to the worker, and fail to pickle. SB3’s DummyVecEnv calls id(env.unwrapped) for uniqueness checking during construction, so this property is required for SB3 compatibility.
- classmethod make(env_name, *args, **kwargs)[source]
- step(action)[source]
- reset(seed=None, options=None)[source]
- close()[source]
- Return type:
None
The RealrosGym class in this module is an alias for
uniros._proxy.GymProxy — see the UniROS API page for the
full method reference.
ROS helpers
ROS common
Common helpers for handling ROS from inside the realros framework.
Functions provided:
launch_roscore— launch a roscore with a given or random (non-overlapping) port.get_all_the_ros_masters— get all the currently-known rosmaster ports (diagnostic).add_to_rosmaster_list— add a port to the diagnostic port log.remove_from_rosmaster_list— remove a port from the diagnostic port log.kill_all_host_ros_processes— kill ALL ROS processes on this host.kill_all_host_roslaunch_processes— kill ALL roslaunch processes on this host.kill_all_ros_nodes— kill every node attached to the current rosmaster.kill_ros_node— kill a single named ROS node.ros_kill_master— kill a specific rosmaster by port.clean_ros_logs— purge ROS logs.source_workspace— deprecated no-op.ros_launch_launcher— execute a roslaunch with args.ros_node_launcher— launch a ROS node from a package.ros_load_yaml— fetch and load a YAML file onto the ROS parameter server.load_urdf— load a URDF onto the parameter server.is_roscore_running— check whether a roscore is reachable.change_ros_master— set ROS_MASTER_URI for this process.change_ros_master_multi_device— set ROS_MASTER_URI for cross-machine use.init_robot_state_publisher— start the robot_state_publisher node.remove_all_from_rosmaster_list— truncate the diagnostic port log.
- realros.utils.ros_common.register_managed_process(popen, **selectors)[source]
Register a process spawned by this script for automatic cleanup on Ctrl+C or normal interpreter exit.
- Parameters:
popen – A
subprocess.Popen(typically the xterm wrapper shell)..terminate()will be called on cleanup.**selectors – Optional fallback identifiers used by cleanup when terminating the Popen alone is not enough. Recognised keys are
roscore_port(str | int), which triggerspkill -f "roscore -p <port>"; andkind(str), a free-form label used in log lines.
- Return type:
None
- realros.utils.ros_common.launch_roscore(port=None, set_new_master_vars=True, default_port=False)[source]
Launch a roscore on a free port and (optionally) point this process’s
ROS_MASTER_URIat it.Ports are allocated via
socket.bind(('127.0.0.1', 0))so the kernel picks free ports and serializes between parallel callers.- Parameters:
port (int) – A specific desired port for
ROS_MASTER_URI. If the port is unavailable on this host right now, falls back to a kernel-allocated free port and logs a warning.set_new_master_vars (bool) – change the current
ROS_MASTER_URIenvironment variable to the selected port.default_port (bool) – If True, request port 11311. If 11311 is already in use,
ROS_MASTER_URIis still pointed at it so the caller can attach to an existing roscore.
- Returns:
ROS_MASTER_URIport as a string.- Return type:
- realros.utils.ros_common.get_all_the_ros_masters()[source]
DEPRECATED. Reads the diagnostic port log only.
Port allocation is handled by
launch_roscoreviasocket.bind(0); this helper returns whatever the best-effort diagnostic log contains.- Return type:
- realros.utils.ros_common.add_to_rosmaster_list(ros_port)[source]
DEPRECATED. Appends to the diagnostic port log only.
Port allocation is now handled by
launch_roscoredirectly viasocket.bind(0). Callers do not need to maintain a shared list.
- realros.utils.ros_common.remove_from_rosmaster_list(ros_port)[source]
DEPRECATED. Removes
ros_portfrom the diagnostic log only.Port allocation is now handled by
launch_roscoredirectly viasocket.bind(0). Callers do not need to maintain a shared list.
- realros.utils.ros_common.kill_all_host_ros_processes()[source]
Kill EVERY rosmaster/roslaunch/rosout/robot_state_publisher/nodelet process on this host (not just the ones this script spawned).
Implemented via
killall -9so it is host-scoped: if another user or another script on the same machine is running ROS, this will kill their processes too. Use with care.Renamed from
kill_all_ros_processesto make the host-wide scope explicit. The old name is kept as a DeprecationWarning-emitting alias for backwards compatibility.- Returns:
- True if the kill command ran (does not verify each
process actually died).
- Return type:
- realros.utils.ros_common.kill_all_ros_processes()[source]
DEPRECATED. Use
kill_all_host_ros_processes()instead.The old name was misleading: it sounds like it cleans up only the instances this script spawned, but it actually issues
killall -9against rosmaster/roslaunch/rosout/etc. host-wide.- Return type:
- realros.utils.ros_common.kill_all_host_roslaunch_processes()[source]
Kill EVERY
roslaunchprocess on this host viakillall -9.Host-scoped: not limited to this script’s rosmaster. Renamed from
kill_all_roslaunch_processto make the host-wide scope explicit. The old name is kept as a DeprecationWarning-emitting alias for backwards compatibility.- Returns:
True if the kill command ran.
- Return type:
- realros.utils.ros_common.kill_all_roslaunch_process()[source]
DEPRECATED. Use
kill_all_host_roslaunch_processes()instead.- Return type:
- realros.utils.ros_common.kill_all_ros_nodes(ros_port=None)[source]
Function to kill all running ROS nodes of the specified or current Rosmaster.
- realros.utils.ros_common.kill_ros_node(node_name, ros_port=None)[source]
Function to kill a ROS node of the given or current Rosmaster.
- realros.utils.ros_common.clean_ros_logs()[source]
Function to clean all the ros logs.
- Returns:
True if all the ros logs were closed.
- Return type:
- realros.utils.ros_common.source_workspace(abs_path)[source]
DEPRECATED no-op.
This function used to call
os.system("source devel/setup.bash")against the workspace path. That cannot work for two reasons:os.systemruns the command in/bin/sh(commonlydashon Ubuntu), wheresourceis not a builtin — only.(“dot-source”) is. So the source call fails immediately with “sh: 1: source: not found”.Even if it succeeded, sourcing in a child shell only mutates the child’s environment. The child dies with the subprocess and the calling Python process is unaffected.
To use a ROS workspace from a Python process, source it in the shell that launches Python (i.e.
source devel/setup.bashin the terminal, then runpython).This function is preserved for backwards compatibility but is now a no-op that emits a DeprecationWarning and a rospy logwarn. It will be removed in a future release.
- realros.utils.ros_common.ros_launch_launcher(pkg_name=None, launch_file_name=None, launch_file_abs_path=None, args=None, launch_new_term=True, ros_port=None)[source]
Function to execute a roslaunch with args.
- Parameters:
pkg_name (str) – Name of the package where the launch file is located.
launch_file_name (str) – Name of the launch file.
launch_file_abs_path (str) – Absolute path of the launch file
launch_new_term (bool) – Launch the process in a new terminal (Xterm).
ros_port (str) – The ROS_MASTER_URI port (optional).
- Returns:
True if the launch file was launched and False otherwise.
- Return type:
- realros.utils.ros_common.construct_roslaunch_command(pkg_name, launch_file_name, launch_file_abs_path)[source]
Constructs a roslaunch command using either a package name and launch file name or an absolute path to a launch file.
- realros.utils.ros_common.ros_node_launcher(pkg_name, node_name, launch_master=False, launch_new_term=True, name=None, ns='/', output='log', ros_port=None, args=None)[source]
Function to launch a ROS node from a package. If “launch_master” is “True”, it will also launch a ROSCORE with the given “ros_port” or with a random ROS master port if “ros_port” is not specified.
- Parameters:
pkg_name (str) – Name of the package to launch the node from.
node_name (str) – Name of the node to launch.
launch_master (bool) – If ROSMASTER is not running launch it.
launch_new_term (bool) – Launch the process in a new terminal (Xterm).
name (str) – Name to give the node to be launched.
ns (str) – Namespace to give the node to be launched.
output (str) – log, screen, or None
ros_port (str) – The ROS_MASTER_URI port (optional).
args (List[str]) – List of additional arguments to pass to the rosrun command.
- Returns:
- ROS Master port, and True if the node was launched successfully,
False otherwise.
- Return type:
- realros.utils.ros_common.construct_rosrun_command(pkg_name, node_name, name=None, ns='/', output='log')[source]
Constructs a rosrun command using a package name and node name.
- Parameters:
- Returns:
The constructed rosrun command.
- Return type:
- realros.utils.ros_common.ros_load_yaml(pkg_name=None, file_name=None, file_abs_path=None, ns='/', ros_port=None)[source]
Fetch a YAML file from a package or an abs path and load it into the ROS Parameter Server.
- realros.utils.ros_common.load_urdf(model_path=None, pkg_name=None, file_name=None, folder='/urdf', ns=None, args_xacro=None, param_name=None, ros_port=None)[source]
Function to load a URDF from a ROS package to the parameter server or a string containing the processed URDF data.
- Parameters:
model_path (str) – The path to the URDF file. Defaults to None.
pkg_name (str) – The name of the ROS package containing the URDF file. Defaults to None.
file_name (str) – The name of the URDF file. Defaults to None.
folder (str) – The folder within the ROS package containing the URDF file. Defaults to “/urdf”.
ns (str) – The namespace to use when adding the URDF to the parameter server. Defaults to None.
args_xacro (list) – Additional arguments to pass to xacro when processing the URDF file. Defaults to None.
param_name (str) – The name of the parameter to use when adding the URDF to the parameter server.
ros_port (str) – The ROS_MASTER_URI port (optional). Defaults to None.
- Returns:
- A tuple containing a boolean value indicating whether the loading was
successful and a string containing the processed URDF data. If an error occurred while loading or processing the URDF file, the first element of the tuple will be False and the second element will be None.
- Return type:
- realros.utils.ros_common.is_roscore_running()[source]
Function to check if a roscore is currently running.
- Returns:
True if a roscore is running, False otherwise.
- Return type:
- realros.utils.ros_common.change_ros_master(ros_port)[source]
Function to set the current ROS Master Environment Variable.
- realros.utils.ros_common.change_ros_master_multi_device(remote_ip, local_ip, remote_ros_port='11311')[source]
Function to set the current ROS Master Environment Variable for multi-device communication.
ROS controllers (re-export)
Re-export of the canonical ROS-controller helpers from
uniros.utils.ros_controllers.
Each helper wraps a controller_manager service call (load,
unload, list, switch, start, stop, reset, spawn, unspawn) with a
30-second rospy.wait_for_service timeout so a hung
controller_manager surfaces as a logged failure rather than an
indefinite hang.
Usage:
from realros.utils.ros_controllers import load_ros_controller, ...
Re-export of uniros.utils.ros_controllers. See the UniROS
API page for the full helper reference.
ROS markers (re-export)
Re-export of the canonical ROS marker helpers from
uniros.utils.ros_markers.
Usage:
from realros.utils.ros_markers import RosMarker, RosMarkerArray
Re-export of uniros.utils.ros_markers.
ROS kinematics (re-export)
Re-export of the canonical kinematics helpers from
uniros.utils.ros_kinematics.
Usage:
from realros.utils.ros_kinematics import Kinematics_pyrobot, Kinematics_pykdl
Re-export of uniros.utils.ros_kinematics.
MoveIt integration
This Class is to specify all the common functions related to the handling of ROS Moveit for manipulation tasks.
- It has the following methods (Main functions),
01. set_trajectory_ee: Set a pose target for the end effector of the robot arm. 02. set_trajectory_joints: Set a joint position target for the arm joints. 03. set_gripper_joints: Set a joint position target for the gripper joints. 04. get_robot_pose: Get the current pose of the robot arm. 05. get_robot_rpy: Get the current roll, pitch, and yaw angles of the robot arm. 06. get_gripper_pose: Get the current pose of the gripper. 07. get_gripper_rpy: Get the current roll, pitch, and yaw angles of the gripper. 08. get_joint_angles_robot_arm: Get the current joint angles of the robot arm. 09. get_joint_angles_gripper: Get the current joint angles of the gripper. 10. get_joint_angles: Get the current joint angles of both the robot arm and gripper. 11. check_goal: Check if a goal position is reachable by the robot arm. 12. get_randomJointVals: Get random joint values for the robot arm. 13. get_randomPose: Get a random pose for the robot arm. 14. set_planning_time: Set the maximum time allowed for planning a trajectory. 15. set_goal_position_tolerance: Set the goal position tolerance for the robot arm. 16. set_goal_orientation_tolerance: Set the goal orientation tolerance for the robot arm. 17. set_goal_joint_tolerance: Set the goal joint tolerance for the robot arm. 18. set_max_acceleration_scaling_factor: Set the maximum acceleration scaling factor for the robot arm. 19. set_max_velocity_scaling_factor: Set the maximum velocity scaling factor for the robot arm. 20. set_trajectory_cartesian: Set a cartesian trajectory for the end effector of the robot arm. 21. stop_arm: Stop the robot arm from moving. 22. stop_gripper: Stop the gripper from moving. 23. check_goal_joint_pos: Check if a goal position is reachable by the robot arm.
- class realros.utils.moveit_realros.MoveitRealROS(arm_name, gripper_name=None, robot_description=None, ns=None)[source]
Bases:
objectA class for using MoveIt!
This class provides a set of methods for controlling a robot arm and gripper using MoveIt!. It allows you to set joint angles, execute trajectories, and check if a goal is reachable. It also provides methods for getting the current pose and joint angles of the robot arm and gripper.
To use this class, you must first initialize it with the appropriate parameters for your robot arm and gripper. You can then call the various methods provided by this class to control your robot.
- arm_execute_joint_trajectory(joint_target_values, async_move=False)[source]
Execute a joint trajectory with the robot arm.
- Parameters:
- Returns:
The result of the trajectory execution.
- Return type:
- Raises:
ValueError – If the input joint target values do not match the number of joints of the robot arm.
- gripper_execute_joint_command(target_joint_values, async_move=False)[source]
Execute a joint command with the gripper.
- Parameters:
- Returns:
The result of the command execution.
- Return type:
- Raises:
ValueError – If the input joint target values do not match the number of joints of the gripper.
- robot_pose()[source]
Get the current pose of the robot arm.
- Returns:
The current pose of the robot arm.
- Return type:
PoseStamped
- robot_rpy()[source]
Get the current roll, pitch, and yaw angles of the robot arm.
- Returns:
The current roll, pitch, and yaw angles of the robot arm.
- Return type:
List[float]
- gripper_pose(gripper_link='')[source]
Get the current pose of the gripper.
- Parameters:
gripper_link (str) – The name of the gripper link to get the pose of.
- Returns:
The current pose of the gripper.
- Return type:
PoseStamped
- joint_angles_arm()[source]
Get the current joint angles of the robot arm.
- Returns:
The current joint angles of the robot arm.
- Return type:
List[float]
- joint_angles_gripper()[source]
Get the current joint angles of the gripper.
- Returns:
The current joint angles of the gripper.
- Return type:
List[float]
- joint_angles()[source]
Get the current joint angles of both the robot arm and gripper.
- Returns:
The current joint angles of both the robot arm and gripper.
- Return type:
List[float]
- check_goal_reachable_joint_pos(joint_pos)[source]
Check if a goal position is reachable by the robot arm.
- set_trajectory_ee(position, orientation=None, async_move=False)[source]
Set a pose target for the end effector of the robot arm.
- Parameters:
- Returns:
The result of the trajectory execution.
- Return type:
- set_trajectory_joints(q_positions, async_move=False)[source]
Set a joint position target for the arm joints.
- set_gripper_joints(joint_positions, async_move=False)[source]
Set a joint position target for the gripper joints.
- set_trajectory_cartesian(waypoints, eef_step=0.01, jump_threshold=0.0, avoid_collisions=True)[source]
Set a cartesian trajectory for the end effector of the robot arm.
- Parameters:
waypoints (List[PoseStamped]) – The target waypoints for the end effector.
eef_step (float) – The distance between waypoints in meters (Optional).
jump_threshold (float) – The maximum distance in the configuration space between consecutive points in the resulting path (Optional).
avoid_collisions (bool) – Whether to check for collisions between waypoints or not (Optional).
- Returns:
The result of the trajectory execution.
- Return type:
- get_robot_pose()[source]
Get the current pose of the robot arm.
- Returns:
The current pose of the robot arm.
- Return type:
PoseStamped
- get_robot_rpy()[source]
Get the current roll, pitch, and yaw angles of the robot arm.
- Returns:
The current roll, pitch, and yaw angles of the robot arm.
- Return type:
List[float]
- get_gripper_pose(link='')[source]
Get the current pose of the gripper.
- Parameters:
link (str) – The name of the gripper link to get the pose of.
- Returns:
The current pose of the gripper.
- Return type:
PoseStamped
- get_joint_angles_robot_arm()[source]
Get the current joint angles of the robot arm.
- Returns:
The current joint angles of the robot arm.
- Return type:
List[float]
- get_joint_angles_gripper()[source]
Get the current joint angles of the gripper.
- Returns:
The current joint angles of the gripper.
- Return type:
List[float]
- get_joint_angles()[source]
Get the current joint angles of both the robot arm and gripper.
- Returns:
The current joint angles of both the robot arm and gripper.
- Return type:
List[float]
- get_randomJointVals()[source]
Get random joint values for the robot arm.
- Returns:
Random joint values for the robot arm.
- Return type:
List[float]
- get_randomPose()[source]
Get a random pose for the robot arm.
- Returns:
A random pose for the robot arm.
- Return type:
PoseStamped
- set_planning_time(planning_time)[source]
Set the maximum allowed planning time. Specify the amount of time to be used for motion planning
- Parameters:
planning_time (float) – The maximum allowed planning time in seconds.
- Return type:
None
- set_goal_position_tolerance(position_tolerance)[source]
Set the tolerance for a target end-effector position
- Parameters:
position_tolerance (float) – The tolerance for the goal position in meters.
- Return type:
None
- set_goal_orientation_tolerance(orientation_tolerance)[source]
Set the tolerance for a target end-effector orientation.
- Parameters:
orientation_tolerance (float) – The tolerance for the goal orientation in radians.
- Return type:
None
- set_goal_joint_tolerance(joint_tolerance)[source]
Set the tolerance for a target joint configuration.
- Parameters:
joint_tolerance (float) – The tolerance for the goal joint configuration in radians.
- Return type:
None
- set_max_acceleration_scaling_factor(acceleration_scaling_factor)[source]
Set a scaling factor to reduce the maximum joint accelerations. Allowed values are in (0,1]. The default value is set in the joint_limits.yaml of the moveit_config package.
- Parameters:
acceleration_scaling_factor (float) – The maximum acceleration scaling factor.
- Return type:
None
- set_max_velocity_scaling_factor(velocity_scaling_factor)[source]
Set a scaling factor to reduce the maximum joint velocities. Allowed values are in (0,1]. The default value is set in the joint_limits.yaml of the moveit_config package.
- Parameters:
velocity_scaling_factor (float) – The maximum velocity scaling factor.
- Return type:
None
Base envs
RealBaseEnv
- class realros.envs.RealBaseEnv.RealBaseEnv(load_robot=True, robot_pkg_name=None, robot_launch_file=None, robot_args=None, load_urdf=False, urdf_pkg_name=None, urdf_file_name=None, urdf_folder='/urdf', urdf_xacro_args=None, namespace='/', launch_robot_state_pub=False, robot_state_publisher_max_freq=None, new_robot_state_term=False, load_controllers=False, controllers_file=None, controllers_list=None, reset_controllers=False, reset_controllers_prompt=False, kill_rosmaster=True, clean_logs=False, ros_port=None, seed=None, reset_env_prompt=False, close_env_prompt=False, action_cycle_time=0.0, log_internal_state=False, multi_device_mode=False, remote_ip=None, local_ip=None)[source]
Bases:
EnvA custom gymnasium environment for reinforcement learning using ROS and real robots.
Initialize the RealBaseEnv.
- Parameters:
load_robot (bool) – Load the real robot using a launch file.
robot_pkg_name (str) – The name of the package containing the real robot launch file.
robot_launch_file (str) – The name of the robot launch file.
robot_args (list of str) – Args to pass to the robot launch file.
load_urdf (bool) – Whether to load the URDF of the real robot.
urdf_pkg_name (str) – The name of the package containing the URDF file.
urdf_file_name (str) – The name of the URDF file.
urdf_folder (str) – The folder containing the URDF file.
urdf_xacro_args (List[str]) – A list of arguments to pass to xacro when processing the URDF file.
namespace (str) – The ROS namespace to use.
launch_robot_state_pub (bool) – Whether to launch the robot state publisher.
robot_state_publisher_max_freq (float) – The maximum frequency at which to publish robot state messages.
new_robot_state_term (bool) – Whether to launch the robot state publisher in a new terminal.
load_controllers (bool) – Whether to load controllers for the real robot.
controllers_file (str) – The name of the file containing the controller configuration.
controllers_list (List[str]) – A list of controllers to spawn.
reset_controllers (bool) – Whether to reset the controllers when resetting the environment.
reset_controllers_prompt (bool) – Whether to prompt the user before resetting the controllers.
kill_rosmaster (bool) – Whether to kill the ROS master when closing the environment.
clean_logs (bool) – Whether to clean the ROS logs when closing the environment.
ros_port (str) – The port number to use for the ROS master.
seed (int) – The seed for the random number generator.
reset_env_prompt (bool) – Whether to prompt the user to resetting the environment.
close_env_prompt (bool) – Whether to prompt the user to closing the environment.
action_cycle_time (float) – The time to wait between applying actions.
log_internal_state (bool) – Whether to log the internal state of the environment.
multi_device_mode (bool) – Whether to run the environment in multi-device mode.
remote_ip (str) – The IP address of the remote ROS master (if multi-device mode is enabled).
local_ip (str) – The IP address of the local PC (if multi-device mode is enabled).
- close_env_prompt
Load the real robot using a launch file
- step(action)[source]
Take a step in the environment.
- Parameters:
action (Any) – The action to be applied to the robot.
- Returns:
The observation representing the current state of the environment. reward (float): The reward for taking the given action. terminated (bool): Whether the agent reaches the terminal state. truncated (bool): Whether the episode is truncated due to various reasons. (e.g. reaching the maximum number of steps, or end before the terminal state) info (dict): Additional information about the environment.
- Return type:
observation (Any)
RealGoalEnv
- class realros.envs.RealGoalEnv.RealGoalEnv(load_robot=True, robot_pkg_name=None, robot_launch_file=None, robot_args=None, load_urdf=False, urdf_pkg_name=None, urdf_file_name=None, urdf_folder='/urdf', urdf_xacro_args=None, namespace='/', launch_robot_state_pub=False, robot_state_publisher_max_freq=None, new_robot_state_term=False, load_controllers=False, controllers_file=None, controllers_list=None, reset_controllers=False, reset_controllers_prompt=False, kill_rosmaster=True, clean_logs=False, ros_port=None, seed=None, reset_env_prompt=False, close_env_prompt=False, action_cycle_time=0.0, log_internal_state=False, multi_device_mode=False, remote_ip=None, local_ip=None)[source]
Bases:
GoalEnvA custom gymnasium robotics goal-conditioned environment for reinforcement learning using ROS and real robots.
Initialize the RealGoalEnv environment.
- Parameters:
load_robot (bool) – Load the real robot using a launch file.
robot_pkg_name (str) – The name of the package containing the real robot launch file.
robot_launch_file (str) – The name of the robot launch file.
robot_args (list of str) – Args to pass to the robot launch file.
load_urdf (bool) – Whether to load the URDF of the real robot.
urdf_pkg_name (str) – The name of the package containing the URDF file.
urdf_file_name (str) – The name of the URDF file.
urdf_folder (str) – The folder containing the URDF file.
urdf_xacro_args (List[str]) – A list of arguments to pass to xacro when processing the URDF file.
namespace (str) – The ROS namespace to use.
launch_robot_state_pub (bool) – Whether to launch the robot state publisher.
robot_state_publisher_max_freq (float) – The maximum frequency at which to publish robot state messages.
new_robot_state_term (bool) – Whether to launch the robot state publisher in a new terminal.
load_controllers (bool) – Whether to load controllers for the real robot.
controllers_file (str) – The name of the file containing the controller configuration.
controllers_list (List[str]) – A list of controllers to spawn.
reset_controllers (bool) – Whether to reset the controllers when resetting the environment.
reset_controllers_prompt (bool) – Whether to prompt the user before resetting the controllers.
kill_rosmaster (bool) – Whether to kill the ROS master when closing the environment.
clean_logs (bool) – Whether to clean the ROS logs when closing the environment.
ros_port (str) – The port number to use for the ROS master.
seed (int) – The seed for the random number generator.
reset_env_prompt (bool) – Whether to prompt the user to resetting the environment.
close_env_prompt (bool) – Whether to prompt the user to closing the environment.
action_cycle_time (float) – The time to wait between applying actions.
log_internal_state (bool) – Whether to log the internal state of the environment.
multi_device_mode (bool) – Whether to run the environment in multi-device mode.
remote_ip (str) – The IP address of the remote machine in multi-device mode.
local_ip (str) – The IP address of the local machine in multi-device mode.
- close_env_prompt
Load the real robot using a launch file
- step(action)[source]
Take a step in the environment.
- Parameters:
action (Any) – The action to be applied to the robot.
- Returns:
A dictionary containing the observation, achieved goal, and desired goal. reward (float): The reward for taking the given action. terminated (bool): Whether the agent reaches the terminal state. truncated (bool): Whether the episode is truncated due to various reasons. (e.g. reaching the maximum number of steps, or end before the terminal state) info (dict): Additional information about the environment.
- Return type:
observation (dict)
- reset(seed=None, options=None)[source]
Reset the environment.
- Parameters:
- Returns:
A dictionary containing the initial observation, achieved goal, and desired goal. info (dict): Additional information about the environment. Similar to the info returned by step().
- Return type:
observation (dict)
- compute_terminated(achieved_goal, desired_goal, info)[source]
Function to check if the episode is terminated due to reaching a terminal state.
This method should be implemented by subclasses to return a boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered).
- Parameters:
achieved_goal (Any) – The achieved goal representing the current state of the environment.
desired_goal (Any) – The desired goal representing the target state of the environment.
info (dict) – Additional information for computing the termination condition.
- Returns:
A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)
- compute_truncated(achieved_goal, desired_goal, info)[source]
Function to check if the episode is truncated due non-terminal reasons.
This method should be implemented by subclasses to return a boolean value indicating whether the episode has been truncated due to reasons other than reaching a terminal state. Truncated states are those that are out of the scope of the Markov Decision Process (MDP). This could include truncation due to reaching a maximum number of steps, or any other non-terminal condition that causes the episode to end early.
- Parameters:
achieved_goal (Any) – The achieved goal representing the current state of the environment.
desired_goal (Any) – The desired goal representing the target state of the environment.
info (dict) – Additional information for computing the truncation condition.
- Returns:
A boolean value indicating whether the episode has been truncated.
- compute_reward(achieved_goal, desired_goal, info)[source]
Compute the reward for achieving a given goal.
This method should be implemented by subclasses to return a scalar reward value representing how well the agent is doing in the current episode. The reward could be based on the distance to a goal, the amount of time taken to reach a goal, or any other metric that can be used to measure how well the agent is doing.
- Parameters:
achieved_goal (Any) – The achieved goal representing the current state of the environment.
desired_goal (Any) – The desired goal representing the target state of the environment.
info (dict) – Additional information about the environment.
- Returns:
The reward for achieving the given goal.
- Return type:
reward (float)
Wrappers
- class realros.wrappers.normalize_action_wrapper.NormalizeActionWrapper(env)[source]
Bases:
WrapperA wrapper for normalizing the action space of an environment.
This wrapper normalizes the actions to be between -1.0 and 1.0. It can be used with environments whose action space is of type Box.
- Parameters:
env (gym.Env) – The environment to wrap.
- Raises:
ValueError – If the action space of the environment is not of type Box.
Wraps an environment to allow a modular transformation of the
step()andreset()methods.- Parameters:
env (Env) – The environment to wrap
- denormalize_action(action)[source]
Normalize the action to the range of the original action space.
- Parameters:
action (np.ndarray) – The action to normalize.
- Returns:
The normalized action.
- Return type:
np.ndarray
- reverse_action(action)[source]
Reverse the normalization of an action.
- Parameters:
action (np.ndarray) – The action to denormalize.
- Returns:
The denormalized action.
- Return type:
np.ndarray
- step(action)[source]
Take a step in the environment using a normalized action.
- Parameters:
action (np.ndarray) – The normalized action to take.
- Returns:
The observation representing the current state of the environment. reward (float): The reward for taking the given action. done (bool): Whether the episode has ended. info (dict): Additional information about the environment.
- Return type:
observation (Any)
- class realros.wrappers.normalize_obs_wrapper.NormalizeObservationWrapper(env, normalize_goal_spaces=False)[source]
Bases:
ObservationWrapperA wrapper for normalizing the observation space of an environment.
This wrapper normalizes the observations to be between -1 and 1. It can handle environments whose observation space is either a Box or a dictionary with keys for ‘observation’, ‘achieved_goal’, and ‘desired_goal’.
- Parameters:
env (gym.Env) – The environment to wrap.
normalize_goal_spaces (bool) – Whether to normalize the achieved_goal and desired_goal spaces.
- Raises:
ValueError – If the observation space of the environment is not supported.
Constructor for the observation wrapper.
- class realros.wrappers.time_limit_wrapper.TimeLimitWrapper(env, max_episode_steps)[source]
Bases:
Wrapper,RecordConstructorArgsThis wrapper will issue a truncated signal if a maximum number of timesteps is exceeded.
If a truncation is not defined inside the environment itself, this is the only place that the truncation signal is issued. Critically, this is different from the terminated signal that originates from the underlying environment as part of the MDP.
Initializes the
TimeLimitWrapperwith an environment and the number of steps after which truncation will occur.- Parameters:
- step(action)[source]
Steps through the environment and if the number of steps elapsed exceeds
max_episode_stepsthen truncate.