rl_environments API reference

Pre-built gymnasium environments for the robots and tasks the ecosystem currently supports. The package is organised as rl_environments/{robot}/{sim,real}/{robot_envs,task_envs}/....

For the user-facing list of working environments see Environments.

Note

This page covers the robot envs and reach task envs across all four supported robots. The push and pick-and-place task-env modules are intentionally omitted while their docstrings are tidied up — they’re registered and importable; only the API-reference rendering is deferred. See Environments for the user-facing env reference, which does cover push and PnP.

Top-level package

Gymnasium registrations for the rl_environments package.

Convention: one id per task env file. Configuration variants (RGB vs depth observations, joint vs EE action space, vision sensor combos, real-time vs MDP pause-step-resume mode) are exposed as __init__ kwargs on the task env classes — pass them at construction time:

env = gym.make("RX200ReacherSim-v0",
               gazebo_gui=True,
               ee_action_type=True,
               rgb_obs_only=True, normal_obs_only=False,
               reward_type="Sparse",
               realtime_mode=True)

Iteration history: 2026-05-17 slim dropped 33 kwargs-only aliases (46 → 13 ids). 2026-05-19 collapsed RX200 reach v0/v1/v2/v3 → single v0 and push v0/v1 → single v0; slide envs removed entirely. The polished per-link-FK-safe + close-race-guarded code (formerly v3 reach / v1 push) is now the canonical v0.

RX200 — Simulation (Gazebo)

Robot envs

class rl_environments.rx200.sim.robot_envs.rx200_robot_sim.RX200RobotEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_kinect=False)[source]

Bases: GazeboBaseEnv

Superclass for all RX200 Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the pose is retrieved successful position: position of the object as a numpy array orientation: orientation of the object as a numpy array (rpy or quaternion)

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

class rl_environments.rx200.sim.robot_envs.rx200_robot_goal_sim.RX200RobotGoalEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_kinect=False)[source]

Bases: GazeboGoalEnv

Superclass for all RX200 Robot Goal environments.

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo.

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the Gazebo lookup succeeded position: position of the object as a numpy float32 array, or None on failure orientation: orientation as a numpy float32 array (rpy or quaternion), or None on failure

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

Vision-augmented robot envs

class rl_environments.rx200.sim.robot_envs.rx200_robot_sim_zed2.RX200RobotEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_zed2=False)[source]

Bases: GazeboBaseEnv

Superclass for all RX200 Robot environments.
  • Uses a ZED2 camera for RGB and Depth images

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /rx200/zed2/depth/depth_registered: Depth image from ZED2 camera /rx200/zed2/left/image_rect_color: RGB image from ZED2 camera

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the arm controller /rx200/gripper_controller/command: Send the joint positions to the gripper controller

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo.

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the Gazebo lookup succeeded position: position of the object as a numpy float32 array, or None on failure orientation: orientation as a numpy float32 array (rpy or quaternion), or None on failure

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

class rl_environments.rx200.sim.robot_envs.rx200_robot_goal_sim_zed2.RX200RobotGoalEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_zed2=False)[source]

Bases: GazeboGoalEnv

Superclass for all RX200 Robot Goal environments.
  • Uses a ZED2 camera for RGB and Depth images

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /rx200/zed2/depth/depth_registered: Depth image from the ZED2 camera /rx200/zed2/left/image_rect_color: RGB image from the ZED2 camera

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo.

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the Gazebo lookup succeeded position: position of the object as a numpy float32 array, or None on failure orientation: orientation as a numpy float32 array (rpy or quaternion), or None on failure

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

Reach task envs (Kinect v2 vision)

class rl_environments.rx200.sim.task_envs.reach.rx200_kinect_reach_sim.RX200ReacherEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='Dense', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, extra_smoothing=False, realtime_mode=True)[source]

Bases: RX200RobotEnv

This Task env is for a simple Reach Task with the RX200 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • realtime_mode: If True (default), runs the UniROS real-time loop — physics is never paused, a rospy.Timer at environment_loop_rate updates obs/reward/done, and step() reads the latest cached values. This matches the real env, so policies transfer / concurrent sim+real learning Just Works. If False, runs the standard MDP loop — Gazebo physics is paused around each _set_action, the action is executed synchronously, the agent waits action_cycle_time for the trajectory, then a fresh obs/reward/done is sampled. The non-realtime mode is for clean RL-algorithm benchmarking where you want every sample to correspond exactly to the post-action world state.

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • extra_smoothing: Whether to use extra smoothing for actions or not.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • extra_smoothing (bool)

  • realtime_mode (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.rx200.sim.task_envs.reach.rx200_kinect_reach_goal_sim.RX200ReacherGoalEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='sparse', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10.0, action_cycle_time=0.8, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, log_internal_state=False, action_speed=0.5, simple_dense_reward=True, realtime_mode=True)[source]

Bases: RX200RobotGoalEnv

This Task env is for a simple Reach Task with the RX200 robot. This env is made to work in a real-time environment.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) default is 10 Hz - (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • realtime_mode: If True (default), runs the UniROS real-time loop — physics is never paused, a rospy.Timer at environment_loop_rate updates obs/reward/done, and step() reads the latest cached values. This matches the real env, so policies transfer / concurrent sim+real learning Just Works. If False, runs the standard MDP loop — Gazebo physics is paused around each _set_action, the action is executed synchronously, the agent waits action_cycle_time for the trajectory, then a fresh obs/reward/done is sampled. The non-realtime mode is for clean RL-algorithm benchmarking where you want every sample to correspond exactly to the post-action world state.

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • log_internal_state (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • realtime_mode (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Pure function of (achieved_goal, desired_goal): returns True iff the achieved EE position is within reach_tolerance of the desired goal. Supports both the scalar live-step call (ag/dg shape (3,)) and the batched HER-style call (shape (N, 3)) via axis=-1. Does not read or mutate any cached env-loop state; the env loop / step path still populates info['is_success'] independently.

Parameters:
  • achieved_goal – EE position (single or batched).

  • desired_goal – Reach goal (single or batched).

  • info – Additional information about the environment.

Returns:

A boolean (scalar or numpy array) — True where the goal is within reach_tolerance.

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Truncation is driven by the registered max_episode_steps / TimeLimitWrapper outside this env, so this is constantly False. Kept pure (no cached-state read, no self.info mutation) so a batched call returns a consistent result.

Parameters:
  • achieved_goal – EE position (single or batched).

  • desired_goal – Reach goal (single or batched).

  • info – Additional information about the environment.

Returns:

A boolean (scalar or numpy array) — always False.

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Reach task envs (ZED 2 vision)

class rl_environments.rx200.sim.task_envs.reach.rx200_zed2_reach_sim.RX200ReacherEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='Dense', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, realtime_mode=True)[source]

Bases: RX200RobotEnv

This Task env is for a simple Reach Task with the RX200 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • realtime_mode: If True (default), runs the UniROS real-time loop — physics is never paused, a rospy.Timer at environment_loop_rate updates obs/reward/done, and step() reads the latest cached values. This matches the real env, so policies transfer / concurrent sim+real learning Just Works. If False, runs the standard MDP loop — Gazebo physics is paused around each _set_action, the action is executed synchronously, the agent waits action_cycle_time for the trajectory, then a fresh obs/reward/done is sampled. The non-realtime mode is for clean RL-algorithm benchmarking where you want every sample to correspond exactly to the post-action world state.

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /rx200/zed2/depth/depth_registered: Depth image from ZED2 camera /rx200/zed2/left/image_rect_color: RGB image from ZED2 camera

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the arm controller /rx200/gripper_controller/command: Send the joint positions to the gripper controller

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • realtime_mode (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.rx200.sim.task_envs.reach.rx200_zed2_reach_goal_sim.RX200ReacherGoalEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='sparse', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.8, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, log_internal_state=False, action_speed=0.5, simple_dense_reward=True, realtime_mode=True)[source]

Bases: RX200RobotGoalEnv

This Task env is for a simple Reach Task with the RX200 robot. This env is made to work in a real-time environment.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) default is 10 Hz - (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • realtime_mode: If True (default), runs the UniROS real-time loop — physics is never paused, a rospy.Timer at environment_loop_rate updates obs/reward/done, and step() reads the latest cached values. This matches the real env, so policies transfer / concurrent sim+real learning Just Works. If False, runs the standard MDP loop — Gazebo physics is paused around each _set_action, the action is executed synchronously, the agent waits action_cycle_time for the trajectory, then a fresh obs/reward/done is sampled. The non-realtime mode is for clean RL-algorithm benchmarking where you want every sample to correspond exactly to the post-action world state.

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /rx200/zed2/depth/depth_registered: Depth image from the ZED2 camera /rx200/zed2/left/image_rect_color: RGB image from the ZED2 camera

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • log_internal_state (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • realtime_mode (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

RX200 — Real hardware

Robot envs

class rl_environments.rx200.real.robot_envs.rx200_robot_real.RX200RobotEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False)[source]

Bases: RealBaseEnv

Superclass for all real RX200 Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot

Parameters:
fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

Also stamps _latest_joint_state_time so the env_loop can gate ticks on driver freshness (real-side only).

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

class rl_environments.rx200.real.robot_envs.rx200_robot_goal_real.RX200RobotGoalEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False)[source]

Bases: RealGoalEnv

Superclass for all real RX200 Robot environments. - For goal-conditioned tasks

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot

Parameters:
fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

Also stamps _latest_joint_state_time so the env_loop can gate ticks on driver freshness (real-side only).

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

Reach task envs

class rl_environments.rx200.real.task_envs.reach.rx200_reach_real.RX200ReacherEnv(new_roscore=False, roscore_port=None, seed=None, close_env_prompt=True, reset_env_prompt=True, reset_controllers_prompt=False, reward_type='Dense', delta_action=True, delta_coeff=0.05, environment_loop_rate=10.0, action_cycle_time=0.0, use_smoothing=False, default_port=True, ee_action_type=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, use_kinect=False, use_zed2=False)[source]

Bases: RX200RobotEnv

This Task env is for a simple Reach Task with the RX200 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

Init Args:
  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • seed: Seed for the random number generator.

  • close_env_prompt: Whether to prompt the user to close the env or not.

  • reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).

  • reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment loop should run. (in Hz)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds)

  • use_smoothing: Whether to use smoothing for actions or not.

  • default_port: Whether to use the default port for the roscore or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • use_kinect: Whether to use the kinect sensor or not.

  • use_zed2: Whether to use the ZED2 camera or not.

variables to keep track of ros port

Parameters:
  • new_roscore (bool)

  • roscore_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • reset_env_prompt (bool)

  • reset_controllers_prompt (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • ee_action_type (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.rx200.real.task_envs.reach.rx200_reach_goal_real.RX200ReacherGoalEnv(new_roscore=False, roscore_port=None, seed=None, close_env_prompt=True, reset_env_prompt=True, reset_controllers_prompt=False, reward_type='sparse', delta_action=True, delta_coeff=0.05, environment_loop_rate=10.0, action_cycle_time=0.0, use_smoothing=False, default_port=True, ee_action_type=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, use_kinect=False, use_zed2=False)[source]

Bases: RX200RobotGoalEnv

This Task env is for a simple Reach Task with the RX200 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • seed: Seed for the random number generator.

  • close_env_prompt: Whether to prompt the user to close the env or not.

  • reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).

  • reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment loop should run. (in Hz)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds)

  • use_smoothing: Whether to use smoothing for actions or not.

  • default_port: Whether to use the default port for the roscore or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • use_kinect: Whether to use the kinect sensor or not.

  • use_zed2: Whether to use the ZED2 camera or not.

variables to keep track of ros port

Parameters:
  • new_roscore (bool)

  • roscore_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • reset_env_prompt (bool)

  • reset_controllers_prompt (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • ee_action_type (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Niryo Ned2 — Simulation

Robot envs

class rl_environments.ned2.sim.robot_envs.ned2_robot_sim.NED2RobotEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_camera=False, use_wrist_camera=False, gripper=False)[source]

Bases: GazeboBaseEnv

Superclass for all NED2 Robot environments - gazebo-based.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /gazebo_camera/image_raw: RGB image from the robot camera.

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /ned2/niryo_robot_follow_joint_trajectory_controller/command: arm trajectory controller. /ned2/gazebo_tool_commander/follow_joint_trajectory: gripper action server (sim-only; niryo_robot_tools_commander is bypassed in sim, see move_gripper_joints).

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the pose is retrieved successful position: position of the object as a numpy array orientation: orientation of the object as a numpy array (rpy or quaternion)

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(action)[source]

Drive the gripper to “open” or “close” (binary, same API the real env exposes via niryo_robot_tools_commander).

Sim path: publish a JointTrajectory to /gazebo_tool_commander/follow_joint_trajectory (the effort-based controller our description-extras package brings up). Niryo’s tool_commander_node isn’t running in sim, so we bypass it and command the mors joints directly. The action server contract is identical to what the real env’s tool_commander → effort controller would resolve to internally.

Parameters:

action (str) – “open” or “close”.

Returns:

True if the action server returned a result (success or otherwise — the env treats “trajectory dispatched” as the success condition, matching the real-side semantics).

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

wrist_camera_rgb_callback(img_msg)[source]

Callback for Niryo’s built-in wrist camera (sim, on camera_link). Topic: /gazebo_camera/image_raw — sensor_msgs/Image. Converts to an OpenCV RGB numpy array on self.cv_image_wrist.

class rl_environments.ned2.sim.robot_envs.ned2_robot_goal_sim.NED2RobotGoalEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_camera=False, use_wrist_camera=False, gripper=False)[source]

Bases: GazeboGoalEnv

Superclass for all NED2 Robot Goal environments.

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /gazebo_camera/image_raw: RGB image from the robot camera.

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /ned2/niryo_robot_follow_joint_trajectory_controller/command: arm trajectory controller. /ned2/gazebo_tool_commander/follow_joint_trajectory: gripper action server (sim-only; niryo_robot_tools_commander is bypassed in sim, see move_gripper_joints).

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo.

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the Gazebo lookup succeeded position: position of the object as a numpy float32 array, or None on failure orientation: orientation as a numpy float32 array (rpy or quaternion), or None on failure

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(action)[source]

Drive the gripper to “open” or “close” (binary, same API the real env exposes via niryo_robot_tools_commander).

Sim path: publish a JointTrajectory to /gazebo_tool_commander/follow_joint_trajectory directly. See ned2_robot_sim.move_gripper_joints docstring for the full rationale.

Parameters:

action (str) – “open” or “close”.

Returns:

True if the action server returned a result.

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

wrist_camera_rgb_callback(img_msg)[source]

Callback for Niryo’s built-in wrist camera (sim, on camera_link). Topic: /gazebo_camera/image_raw — sensor_msgs/Image. Converts to an OpenCV RGB numpy array on self.cv_image_wrist.

Reach task envs

class rl_environments.ned2.sim.task_envs.reach.ned2_reach_sim.NED2ReacherEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='Dense', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=25, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, extra_smoothing=False, use_wrist_camera=False)[source]

Bases: NED2RobotEnv

This Task env is for a simple Reach Task with the NED2 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (simp obs or rgb/depth image or a combination)

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • extra_smoothing: Whether to use extra smoothing for actions or not.

variables to keep track of ros, gazebo ports and gazebo pid

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • extra_smoothing (bool)

  • use_wrist_camera (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 6 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 6

total: (3x2) + 1 + (6 or 3) + (6x2)

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.ned2.sim.task_envs.reach.ned2_reach_goal_sim.NED2ReacherGoalEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='sparse', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=25.0, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, log_internal_state=False, action_speed=0.5, simple_dense_reward=True, use_wrist_camera=False)[source]

Bases: NED2RobotGoalEnv

This Task env is for a simple Reach Task with the NED2 robot. This env is made to work in a real-time environment.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (simp obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) default is 10 Hz - (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

variables to keep track of ros, gazebo ports and gazebo pid

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • log_internal_state (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • use_wrist_camera (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 6 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 6

total: (3x2) + 1 + (6 or 3) + (6x2)

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Niryo Ned2 — Real hardware

Robot envs

class rl_environments.ned2.real.robot_envs.ned2_robot_real.NED2RobotEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False, use_wrist_camera=False, remote_ip=None, local_ip=None, multi_device_mode=True)[source]

Bases: RealBaseEnv

Superclass for all real NED2 Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /niryo_robot_follow_joint_trajectory_controller/command: Send the joint positions to the robot. /niryo_robot_tools_commander/action_server: Send the joint positions to the robot gripper.

Parameters:
  • ros_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

  • use_wrist_camera (bool)

  • remote_ip (str)

  • local_ip (str)

  • multi_device_mode (bool)

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

Also stamps _latest_joint_state_time so task envs can detect driver / cable disconnects via env_loop freshness gates.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(action)[source]

Set a joint position target only for the gripper joints using low-level ros controllers. - ros action server

Parameters:

action (str) – “open” or “close” the gripper

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

wrist_camera_rgb_callback(img_msg)[source]

Callback for Niryo’s built-in wrist camera (real). Source: /niryo_robot_vision/compressed_video_stream (sensor_msgs/CompressedImage). cv_bridge decodes the compressed bytes; result goes to self.cv_image_wrist as RGB.

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

class rl_environments.ned2.real.robot_envs.ned2_robot_goal_real.NED2RobotGoalEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False, use_wrist_camera=False, remote_ip=None, local_ip=None, multi_device_mode=True)[source]

Bases: RealGoalEnv

Superclass for all real NED2 Robot environments. - For goal-conditioned tasks

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /niryo_robot_follow_joint_trajectory_controller/command: Send the joint positions to the robot. /niryo_robot_tools_commander/action_server: Send the joint positions to the robot gripper.

Parameters:
  • ros_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

  • use_wrist_camera (bool)

  • remote_ip (str)

  • local_ip (str)

  • multi_device_mode (bool)

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

Also stamps _latest_joint_state_time so task envs can detect driver / cable disconnects via env_loop freshness gates.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(action)[source]

Set a joint position target only for the gripper joints using low-level ros controllers. - ros action server

Parameters:

action (str) – “open” or “close” the gripper

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

wrist_camera_rgb_callback(img_msg)[source]

Callback for Niryo’s built-in wrist camera (real). Source: /niryo_robot_vision/compressed_video_stream (sensor_msgs/CompressedImage). cv_bridge decodes the compressed bytes; result goes to self.cv_image_wrist as RGB.

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

Reach task envs

class rl_environments.ned2.real.task_envs.reach.ned2_reach_real.NED2ReacherEnv(new_roscore=False, roscore_port=None, seed=None, close_env_prompt=True, reset_env_prompt=True, reset_controllers_prompt=False, reward_type='Dense', delta_action=True, delta_coeff=0.05, environment_loop_rate=25.0, action_cycle_time=0.0, use_smoothing=False, default_port=True, ee_action_type=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, use_kinect=False, use_zed2=False, use_wrist_camera=False, remote_ip=None, local_ip=None, multi_device_mode=False)[source]

Bases: NED2RobotEnv

This Task env is for a simple Reach Task with the NED2 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (simp obs or rgb/depth image or a combination)

Init Args:
  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • seed: Seed for the random number generator.

  • close_env_prompt: Whether to prompt the user to close the env or not.

  • reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).

  • reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment loop should run. (in Hz)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds)

  • use_smoothing: Whether to use smoothing for actions or not.

  • default_port: Whether to use the default port for the roscore or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • use_kinect: Whether to use the kinect sensor or not.

  • use_zed2: Whether to use the ZED2 camera or not.

  • remote_ip: IP address of the remote machine where the ROS master is running.

  • local_ip: IP address of the local machine where the ROS node is running.

  • multi_device_mode: When True (and remote_ip + local_ip + roscore_port are set) the env attaches to a roscore already running on the remote (e.g. the Niryo Raspberry Pi master). Defaults to False, so by default the env launches its own roscore on the local machine via default_port/new_roscore — set this True when running the published Pi multi-device topology.

variables to keep track of ros port

Parameters:
  • new_roscore (bool)

  • roscore_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • reset_env_prompt (bool)

  • reset_controllers_prompt (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • ee_action_type (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

  • use_wrist_camera (bool)

  • remote_ip (str)

  • local_ip (str)

  • multi_device_mode (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 6 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (6 or 3) + (6x2)

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.ned2.real.task_envs.reach.ned2_reach_goal_real.NED2ReacherGoalEnv(new_roscore=False, roscore_port=None, seed=None, close_env_prompt=True, reset_env_prompt=True, reset_controllers_prompt=False, reward_type='sparse', delta_action=True, delta_coeff=0.05, environment_loop_rate=25.0, action_cycle_time=0.0, use_smoothing=False, default_port=True, ee_action_type=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, use_kinect=False, use_zed2=False, use_wrist_camera=False, remote_ip=None, local_ip=None, multi_device_mode=False)[source]

Bases: NED2RobotGoalEnv

This Task env is for a simple Reach Task with the NED2 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (simp obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • seed: Seed for the random number generator.

  • close_env_prompt: Whether to prompt the user to close the env or not.

  • reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).

  • reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment loop should run. (in Hz)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds)

  • use_smoothing: Whether to use smoothing for actions or not.

  • default_port: Whether to use the default port for the roscore or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • use_kinect: Whether to use the kinect sensor or not.

  • use_zed2: Whether to use the ZED2 camera or not.

  • remote_ip: IP address of the remote machine where the ROS master is running.

  • local_ip: IP address of the local machine where the ROS node is running.

  • multi_device_mode: When True (and remote_ip + local_ip + roscore_port are set) the env attaches to a roscore already running on the remote (e.g. the Niryo Raspberry Pi master). Defaults to False, so by default the env launches its own roscore on the local machine via default_port/new_roscore — set this True when running the published Pi multi-device topology.

variables to keep track of ros port

Parameters:
  • new_roscore (bool)

  • roscore_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • reset_env_prompt (bool)

  • reset_controllers_prompt (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • ee_action_type (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

  • use_wrist_camera (bool)

  • remote_ip (str)

  • local_ip (str)

  • multi_device_mode (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 6 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 6

total: (3x2) + 1 + (6 or 3) + (6x2)

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Trossen VX300S — Simulation

Robot envs

class rl_environments.vx300s.sim.robot_envs.vx300s_robot_sim.VX300SRobotEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_kinect=False)[source]

Bases: GazeboBaseEnv

Superclass for all VX300S Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /vx300s/arm_controller/command: Send the joint positions to the robot. /vx300s/gripper_controller/command: Send the joint positions to the robot.

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the pose is retrieved successful position: position of the object as a numpy array orientation: orientation of the object as a numpy array (rpy or quaternion)

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

class rl_environments.vx300s.sim.robot_envs.vx300s_robot_goal_sim.VX300SRobotGoalEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_kinect=False)[source]

Bases: GazeboGoalEnv

Superclass for all VX300S Robot Goal environments.

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /vx300s/arm_controller/command: Send the joint positions to the robot. /vx300s/gripper_controller/command: Send the joint positions to the robot.

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo.

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the Gazebo lookup succeeded position: position of the object as a numpy float32 array, or None on failure orientation: orientation as a numpy float32 array (rpy or quaternion), or None on failure

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

Reach task envs

class rl_environments.vx300s.sim.task_envs.reach.vx300s_reach_sim.VX300SReacherEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='Dense', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, extra_smoothing=False, realtime_mode=True)[source]

Bases: VX300SRobotEnv

This Task env is for a simple Reach Task with the VX300S robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (31/28 obs or rgb/depth image or a combination)

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • realtime_mode: If True (default), runs the UniROS real-time loop — physics is never paused, a rospy.Timer at environment_loop_rate updates obs/reward/done, and step() reads the latest cached values. This matches the real env, so policies transfer / concurrent sim+real learning Just Works. If False, runs the standard MDP loop — Gazebo physics is paused around each _set_action, the action is executed synchronously, the agent waits action_cycle_time for the trajectory, then a fresh obs/reward/done is sampled. The non-realtime mode is for clean RL-algorithm benchmarking where you want every sample to correspond exactly to the post-action world state.

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • extra_smoothing: Whether to use extra smoothing for actions or not.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /vx300s/arm_controller/command: Send the joint positions to the robot. /vx300s/gripper_controller/command: Send the joint positions to the robot.

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • extra_smoothing (bool)

  • realtime_mode (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 9 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 9

total: (3x2) + 1 + (6 or 3) + (9x2) = 31 or 28

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.vx300s.sim.task_envs.reach.vx300s_reach_goal_sim.VX300SReacherGoalEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='sparse', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10.0, action_cycle_time=0.8, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, log_internal_state=False, action_speed=0.5, simple_dense_reward=True, realtime_mode=True)[source]

Bases: VX300SRobotGoalEnv

This Task env is for a simple Reach Task with the VX300S robot. This env is made to work in a real-time environment.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (31/28 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) default is 10 Hz - (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • realtime_mode: If True (default), runs the UniROS real-time loop — physics is never paused, a rospy.Timer at environment_loop_rate updates obs/reward/done, and step() reads the latest cached values. This matches the real env, so policies transfer / concurrent sim+real learning Just Works. If False, runs the standard MDP loop — Gazebo physics is paused around each _set_action, the action is executed synchronously, the agent waits action_cycle_time for the trajectory, then a fresh obs/reward/done is sampled. The non-realtime mode is for clean RL-algorithm benchmarking where you want every sample to correspond exactly to the post-action world state.

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /vx300s/arm_controller/command: Send the joint positions to the robot. /vx300s/gripper_controller/command: Send the joint positions to the robot.

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • log_internal_state (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • realtime_mode (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 9 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 9

total: (3x2) + 1 + (6 or 3) + (9x2) = 31 or 28

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Trossen VX300S — Real hardware

Robot envs

class rl_environments.vx300s.real.robot_envs.vx300s_robot_real.VX300SRobotEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False)[source]

Bases: RealBaseEnv

Superclass for all real VX300S Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /vx300s/arm_controller/command: Send the joint positions to the robot. /vx300s/gripper_controller/command: Send the joint positions to the robot

Parameters:
fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

Also stamps _latest_joint_state_time so the env_loop can gate ticks on driver freshness (real-side only).

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

class rl_environments.vx300s.real.robot_envs.vx300s_robot_goal_real.VX300SRobotGoalEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False)[source]

Bases: RealGoalEnv

Superclass for all real VX300S Robot environments. - For goal-conditioned tasks

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /vx300s/arm_controller/command: Send the joint positions to the robot. /vx300s/gripper_controller/command: Send the joint positions to the robot

Parameters:
fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

Also stamps _latest_joint_state_time so the env_loop can gate ticks on driver freshness (real-side only).

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

Reach task envs

class rl_environments.vx300s.real.task_envs.reach.vx300s_reach_real.VX300SReacherEnv(new_roscore=False, roscore_port=None, seed=None, close_env_prompt=True, reset_env_prompt=True, reset_controllers_prompt=False, reward_type='Dense', delta_action=True, delta_coeff=0.05, environment_loop_rate=10.0, action_cycle_time=0.0, use_smoothing=False, default_port=True, ee_action_type=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, use_kinect=False, use_zed2=False)[source]

Bases: VX300SRobotEnv

This Task env is for a simple Reach Task with the VX300S robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (31/28 obs or rgb/depth image or a combination)

Init Args:
  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • seed: Seed for the random number generator.

  • close_env_prompt: Whether to prompt the user to close the env or not.

  • reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).

  • reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment loop should run. (in Hz)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds)

  • use_smoothing: Whether to use smoothing for actions or not.

  • default_port: Whether to use the default port for the roscore or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • use_kinect: Whether to use the kinect sensor or not.

  • use_zed2: Whether to use the ZED2 camera or not.

variables to keep track of ros port

Parameters:
  • new_roscore (bool)

  • roscore_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • reset_env_prompt (bool)

  • reset_controllers_prompt (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • ee_action_type (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 9 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 9

total: (3x2) + 1 + (6 or 3) + (9x2) = 31 or 28

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.vx300s.real.task_envs.reach.vx300s_reach_goal_real.VX300SReacherGoalEnv(new_roscore=False, roscore_port=None, seed=None, close_env_prompt=True, reset_env_prompt=True, reset_controllers_prompt=False, reward_type='sparse', delta_action=True, delta_coeff=0.05, environment_loop_rate=10.0, action_cycle_time=0.0, use_smoothing=False, default_port=True, ee_action_type=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, use_kinect=False, use_zed2=False)[source]

Bases: VX300SRobotGoalEnv

This Task env is for a simple Reach Task with the VX300S robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (31/28 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • seed: Seed for the random number generator.

  • close_env_prompt: Whether to prompt the user to close the env or not.

  • reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).

  • reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment loop should run. (in Hz)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds)

  • use_smoothing: Whether to use smoothing for actions or not.

  • default_port: Whether to use the default port for the roscore or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • use_kinect: Whether to use the kinect sensor or not.

  • use_zed2: Whether to use the ZED2 camera or not.

variables to keep track of ros port

Parameters:
  • new_roscore (bool)

  • roscore_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • reset_env_prompt (bool)

  • reset_controllers_prompt (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • ee_action_type (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 9 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 9

total: (3x2) + 1 + (6 or 3) + (9x2) = 31 or 28

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Universal Robots UR5e — Simulation

Robot envs

Superclass for all UR5e Robot environments (Gazebo).

Mirrors vx300s_robot_sim / ned2_robot_sim / rx200_robot_sim in shape: spawns the URDF under /ur5e, brings up ros_control, launches MoveIt under the same namespace, exposes FK / IK / joint + gripper publish helpers, and a per-link FK safety check.

class rl_environments.ur5e.sim.robot_envs.ur5e_robot_sim.UR5eRobotEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=True, use_kinect=False)[source]

Bases: GazeboBaseEnv

Superclass for all UR5e Robot environments.

Sensor topics:

MoveIt: pose + rpy of the robot. /ur5e/joint_states: arm + gripper joint states. /head_mount_kinect2/depth/image_raw: depth from head-mount Kinect v2. /head_mount_kinect2/rgb/image_raw: rgb from head-mount Kinect v2.

Actuator topics:

MoveIt: set joint targets or EE pose via planning. /ur5e/arm_controller/command: 6-DOF arm trajectory commands. /ur5e/gripper_controller/command: Robotiq 2F-85 knuckle command.

Parameters:
get_model_pose(model_name='red_cube', rpy=True)[source]

Get an object’s pose from Gazebo in the ur5e base frame.

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn the red cube on the cafe table at z = 0.795.

remove_cube_in_gazebo()[source]
fk_pykdl(action)[source]

Forward kinematics via pykdl_utils. Returns EE position (np.ndarray) or None if action is empty / None (caller can fall back).

calculate_fk(joint_positions, euler=True)[source]

Forward kinematics via ros_kinematics. Returns (done, ee_pos, ee_rpy_or_quat).

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Inverse kinematics via ros_kinematics. Returns (done, joint_positions).

joint_state_callback(joint_state)[source]
move_arm_joints(q_positions, time_from_start=0.5)[source]

Send a single JointTrajectory point to the arm controller.

Parameters:
Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Send a single JointTrajectory point to the Robotiq knuckle.

Robotiq 2F-85 range: ~0.0 (fully open) to ~0.8 rad (fully closed).

Parameters:
Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Interpolate from current to target joint pos with multiplier steps.

publish_trajectory(trajectory_points)[source]
set_trajectory_joints(q_positions)[source]

MoveIt joint-space plan + execute.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

MoveIt EE-pose plan + execute.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]
get_ee_rpy()[source]
get_joint_angles()[source]

Current arm joint positions (6 elements).

check_goal(goal)[source]
check_goal_reachable_joint_pos(joint_pos)[source]
kinect_depth_callback(data)[source]
kinect_rgb_callback(img_msg)[source]

Goal-conditioned superclass for UR5e Robot environments (Gazebo).

Same shape as UR5eRobotEnv (ur5e_robot_sim) but inherits from GazeboGoalEnv so subclasses can use Dict observation spaces with observation / achieved_goal / desired_goal keys (needed by HER + the Gymnasium GoalEnv contract).

Side-effect imports ur5e_robot_sim to set GAZEBO_MODEL_PATH at module import time (the env-internal Gazebo doesn’t inherit env from roslaunch).

class rl_environments.ur5e.sim.robot_envs.ur5e_robot_goal_sim.UR5eRobotGoalEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=True, use_kinect=False)[source]

Bases: GazeboGoalEnv

Goal-conditioned superclass. See ur5e_robot_sim.UR5eRobotEnv for the non-goal twin (same bring-up, same publish helpers).

Initialize the GazeboGoalEnv.

Parameters:
  • spawn_robot (bool) – Whether to spawn the robot in Gazebo.

  • urdf_pkg_name (str) – The name of the URDF package.

  • urdf_file_name (str) – The name of the URDF file.

  • urdf_folder (str) – The folder containing the URDF file.

  • urdf_xacro_args (List[str]) – The arguments for the xacro processor.

  • namespace (str) – The ROS namespace for the robot.

  • robot_state_publisher_max_freq (float) – The maximum frequency for the robot state publisher.

  • new_robot_state_term (bool) – Whether to use a new terminal for the robot state publisher.

  • robot_model_name (str) – The name of the robot model in Gazebo.

  • robot_ref_frame (str) – The reference frame for the robot in Gazebo.

  • robot_pos_x (float) – The x position of the robot in Gazebo.

  • robot_pos_y (float) – The y position of the robot in Gazebo.

  • robot_pos_z (float) – The z position of the robot in Gazebo.

  • robot_ori_w (float) – The w orientation of the robot in Gazebo.

  • robot_ori_x (float) – The x orientation of the robot in Gazebo.

  • robot_ori_y (float) – The y orientation of the robot in Gazebo.

  • robot_ori_z (float) – The z orientation of the robot in Gazebo.

  • controllers_file (str) – The file containing the controller configurations.

  • controllers_list (List[str]) – The list of ROS controllers to use.

  • reset_controllers (bool) – Whether to reset the controllers on reset.

  • reset_mode (str) – The mode to use when resetting Gazebo (“world” or “simulation”).

  • sim_step_mode (int) – The mode to use when stepping the simulation (1 or 2).

  • num_gazebo_steps (int) – The number of Gazebo steps to take per step call.

  • gazebo_max_update_rate (float) – The maximum update rate for Gazebo.

  • gazebo_timestep (float) – The time step for Gazebo.

  • kill_rosmaster (bool) – Whether to kill the ROS master on close.

  • kill_gazebo (bool) – Whether to kill Gazebo on close.

  • clean_logs (bool) – Whether to clean the ROS logs on close.

  • ros_port (str) – The ROS_MASTER_URI port.

  • gazebo_port (str) – The GAZEBO_MASTER_URI port.

  • gazebo_pid (subprocess.Popen) – A subprocess.Popen object representing the running Gazebo instance

  • seed (int) – Seed for random number generator

  • unpause_pause_physics (bool) – Whether to unpause and pause Gazebo before and after each step.

  • action_cycle_time (float) – The time to wait between applying actions.

  • log_internal_state (bool) – Whether to log the internal state of the environment.

  • controller_package_name (str) – The name of the package containing the controllers.

  • real_time (bool)

  • load_cube (bool)

  • load_table (bool)

  • use_kinect (bool)

get_model_pose(model_name='red_cube', rpy=True)[source]
spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]
remove_cube_in_gazebo()[source]
fk_pykdl(action)[source]
calculate_fk(joint_positions, euler=True)[source]
calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]
joint_state_callback(joint_state)[source]
move_arm_joints(q_positions, time_from_start=0.5)[source]
Parameters:
Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]
Parameters:
Return type:

bool

set_trajectory_joints(q_positions)[source]
Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]
Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]
get_ee_rpy()[source]
get_joint_angles()[source]
check_goal(goal)[source]
check_goal_reachable_joint_pos(joint_pos)[source]
kinect_depth_callback(data)[source]
kinect_rgb_callback(img_msg)[source]

Reach task envs

class rl_environments.ur5e.sim.task_envs.reach.ur5e_reach_sim.UR5eReacherEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=True, gazebo_gui=False, seed=None, reward_type='Dense', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, load_cube=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, extra_smoothing=False, realtime_mode=True)[source]

Bases: UR5eRobotEnv

This Task env is for a simple Reach Task with the UR5e robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (27/24 obs or rgb/depth image or a combination)

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • realtime_mode: If True (default), runs the UniROS real-time loop — physics is never paused, a rospy.Timer at environment_loop_rate updates obs/reward/done, and step() reads the latest cached values. This matches the real env, so policies transfer / concurrent sim+real learning Just Works. If False, runs the standard MDP loop — Gazebo physics is paused around each _set_action, the action is executed synchronously, the agent waits action_cycle_time for the trajectory, then a fresh obs/reward/done is sampled. The non-realtime mode is for clean RL-algorithm benchmarking where you want every sample to correspond exactly to the post-action world state.

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to use the UR5e tabletop world or not.

  • load_cube: Whether to load the red cube model or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • extra_smoothing: Whether to use extra smoothing for actions or not.

Sensor topics:

MoveIt: pose + rpy of the robot. /ur5e/joint_states: arm + gripper joint states. /head_mount_kinect2/depth/image_raw: depth from head-mount Kinect v2. /head_mount_kinect2/rgb/image_raw: rgb from head-mount Kinect v2.

Actuator topics:

MoveIt: set joint targets or EE pose via planning. /ur5e/arm_controller/command: 6-DOF arm trajectory commands. /ur5e/gripper_controller/command: Robotiq 2F-85 knuckle command.

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • load_cube (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • extra_smoothing (bool)

  • realtime_mode (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 7 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 7

total: (3x2) + 1 + (6 or 3) + (7x2) = 27 or 24

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.ur5e.sim.task_envs.reach.ur5e_reach_goal_sim.UR5eReacherGoalEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=True, gazebo_gui=False, seed=None, reward_type='sparse', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10.0, action_cycle_time=0.8, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, load_cube=False, debug=False, log_internal_state=False, action_speed=0.5, simple_dense_reward=True, realtime_mode=True)[source]

Bases: UR5eRobotGoalEnv

This Task env is for a simple Reach Task with the UR5e robot. This env is made to work in a real-time environment.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (27/24 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) default is 10 Hz - (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • realtime_mode: If True (default), runs the UniROS real-time loop — physics is never paused, a rospy.Timer at environment_loop_rate updates obs/reward/done, and step() reads the latest cached values. This matches the real env, so policies transfer / concurrent sim+real learning Just Works. If False, runs the standard MDP loop — Gazebo physics is paused around each _set_action, the action is executed synchronously, the agent waits action_cycle_time for the trajectory, then a fresh obs/reward/done is sampled. The non-realtime mode is for clean RL-algorithm benchmarking where you want every sample to correspond exactly to the post-action world state.

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to use the UR5e tabletop world or not.

  • load_cube: Whether to load the red cube model or not.

  • debug: Whether to print debug messages or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

Initialize the GazeboGoalEnv.

Parameters:
  • spawn_robot (bool) – Whether to spawn the robot in Gazebo.

  • urdf_pkg_name (str) – The name of the URDF package.

  • urdf_file_name (str) – The name of the URDF file.

  • urdf_folder (str) – The folder containing the URDF file.

  • urdf_xacro_args (List[str]) – The arguments for the xacro processor.

  • namespace (str) – The ROS namespace for the robot.

  • robot_state_publisher_max_freq (float) – The maximum frequency for the robot state publisher.

  • new_robot_state_term (bool) – Whether to use a new terminal for the robot state publisher.

  • robot_model_name (str) – The name of the robot model in Gazebo.

  • robot_ref_frame (str) – The reference frame for the robot in Gazebo.

  • robot_pos_x (float) – The x position of the robot in Gazebo.

  • robot_pos_y (float) – The y position of the robot in Gazebo.

  • robot_pos_z (float) – The z position of the robot in Gazebo.

  • robot_ori_w (float) – The w orientation of the robot in Gazebo.

  • robot_ori_x (float) – The x orientation of the robot in Gazebo.

  • robot_ori_y (float) – The y orientation of the robot in Gazebo.

  • robot_ori_z (float) – The z orientation of the robot in Gazebo.

  • controllers_file (str) – The file containing the controller configurations.

  • controllers_list (List[str]) – The list of ROS controllers to use.

  • reset_controllers (bool) – Whether to reset the controllers on reset.

  • reset_mode (str) – The mode to use when resetting Gazebo (“world” or “simulation”).

  • sim_step_mode (int) – The mode to use when stepping the simulation (1 or 2).

  • num_gazebo_steps (int) – The number of Gazebo steps to take per step call.

  • gazebo_max_update_rate (float) – The maximum update rate for Gazebo.

  • gazebo_timestep (float) – The time step for Gazebo.

  • kill_rosmaster (bool) – Whether to kill the ROS master on close.

  • kill_gazebo (bool) – Whether to kill Gazebo on close.

  • clean_logs (bool) – Whether to clean the ROS logs on close.

  • ros_port (str) – The ROS_MASTER_URI port.

  • gazebo_port (str) – The GAZEBO_MASTER_URI port.

  • gazebo_pid (subprocess.Popen) – A subprocess.Popen object representing the running Gazebo instance

  • seed (int) – Seed for random number generator

  • unpause_pause_physics (bool) – Whether to unpause and pause Gazebo before and after each step.

  • action_cycle_time (float) – The time to wait between applying actions.

  • log_internal_state (bool) – Whether to log the internal state of the environment.

  • controller_package_name (str) – The name of the package containing the controllers.

  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • load_cube (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • realtime_mode (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 7 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 7

total: (3x2) + 1 + (6 or 3) + (7x2) = 27 or 24

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Universal Robots UR5e — Real hardware

Robot envs

class rl_environments.ur5e.real.robot_envs.ur5e_robot_real.UR5eRobotEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False)[source]

Bases: RealBaseEnv

Superclass for all real UR5e Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /ur5e/arm_controller/command: Send the joint positions to the robot. /ur5e/gripper_controller/command: Send the joint positions to the robot

Parameters:
fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

Also stamps _latest_joint_state_time so the env_loop can gate ticks on driver freshness (real-side only).

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

class rl_environments.ur5e.real.robot_envs.ur5e_robot_goal_real.UR5eRobotGoalEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False)[source]

Bases: RealGoalEnv

Superclass for all real UR5e Robot environments. - For goal-conditioned tasks

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /ur5e/arm_controller/command: Send the joint positions to the robot. /ur5e/gripper_controller/command: Send the joint positions to the robot

Parameters:
fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

Also stamps _latest_joint_state_time so the env_loop can gate ticks on driver freshness (real-side only).

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 6 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

Reach task envs

class rl_environments.ur5e.real.task_envs.reach.ur5e_reach_real.UR5eReacherEnv(new_roscore=False, roscore_port=None, seed=None, close_env_prompt=True, reset_env_prompt=True, reset_controllers_prompt=False, reward_type='Dense', delta_action=True, delta_coeff=0.05, environment_loop_rate=None, action_cycle_time=0.0, use_smoothing=False, default_port=True, ee_action_type=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, use_kinect=False, use_zed2=False)[source]

Bases: UR5eRobotEnv

This Task env is for a simple Reach Task with the UR5e robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (31/28 obs or rgb/depth image or a combination)

Init Args:
  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • seed: Seed for the random number generator.

  • close_env_prompt: Whether to prompt the user to close the env or not.

  • reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).

  • reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment loop should run. (in Hz)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds)

  • use_smoothing: Whether to use smoothing for actions or not.

  • default_port: Whether to use the default port for the roscore or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • use_kinect: Whether to use the kinect sensor or not.

  • use_zed2: Whether to use the ZED2 camera or not.

variables to keep track of ros port

Parameters:
  • new_roscore (bool)

  • roscore_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • reset_env_prompt (bool)

  • reset_controllers_prompt (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • ee_action_type (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 9 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 9

total: (3x2) + 1 + (6 or 3) + (9x2) = 31 or 28

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.ur5e.real.task_envs.reach.ur5e_reach_goal_real.UR5eReacherGoalEnv(new_roscore=False, roscore_port=None, seed=None, close_env_prompt=True, reset_env_prompt=True, reset_controllers_prompt=False, reward_type='sparse', delta_action=True, delta_coeff=0.05, environment_loop_rate=None, action_cycle_time=0.0, use_smoothing=False, default_port=True, ee_action_type=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, use_kinect=False, use_zed2=False)[source]

Bases: UR5eRobotGoalEnv

This Task env is for a simple Reach Task with the UR5e robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (31/28 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • seed: Seed for the random number generator.

  • close_env_prompt: Whether to prompt the user to close the env or not.

  • reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).

  • reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment loop should run. (in Hz)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds)

  • use_smoothing: Whether to use smoothing for actions or not.

  • default_port: Whether to use the default port for the roscore or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • use_kinect: Whether to use the kinect sensor or not.

  • use_zed2: Whether to use the ZED2 camera or not.

variables to keep track of ros port

Parameters:
  • new_roscore (bool)

  • roscore_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • reset_env_prompt (bool)

  • reset_controllers_prompt (bool)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • ee_action_type (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 9 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 9

total: (3x2) + 1 + (6 or 3) + (9x2) = 31 or 28

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits