rl_environments API reference

Pre-built gymnasium environments for the robots and tasks the ecosystem currently supports. The package is organised as rl_environments/{robot}/{sim,real}/{robot_envs,task_envs}/....

For the user-facing list of working environments see Ready-made environments.

Top-level package

RX200 — Simulation (Gazebo)

Robot envs

class rl_environments.rx200.sim.robot_envs.rx200_robot_sim.RX200RobotEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_kinect=False)[source]

Bases: GazeboBaseEnv

Superclass for all RX200 Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
ros_kin

Finished __init__ method

get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the pose is retrieved successful position: position of the object as a numpy array orientation: orientation of the object as a numpy array (rpy or quaternion)

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

class rl_environments.rx200.sim.robot_envs.rx200_robot_goal_sim.RX200RobotGoalEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_kinect=False)[source]

Bases: GazeboGoalEnv

Superclass for all RX200 Robot Goal environments.

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
get_model_pose(model_name='red_cube')[source]

Get the pose of an object in Gazebo

Parameters:

model_name – name of the object whose pose is to be retrieved

Returns:

pose of the object as a geometry_msgs/PoseStamped message

Return type:

pose

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

Vision-augmented robot envs

class rl_environments.rx200.sim.robot_envs.rx200_robot_sim_zed2.RX200RobotEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_zed2=False)[source]

Bases: GazeboBaseEnv

Superclass for all RX200 Robot environments.
  • Uses a ZED2 camera for RGB and Depth images

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /rx200/zed2/depth/depth_registered: Depth image from ZED2 camera /rx200/zed2/left/image_rect_color: RGB image from ZED2 camera

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the arm controller /rx200/gripper_controller/command: Send the joint positions to the gripper controller

Parameters:
ros_kin

Finished __init__ method

get_model_pose(model_name='red_cube')[source]

Get the pose of an object in Gazebo

Parameters:

model_name – name of the object whose pose is to be retrieved

Returns:

pose of the object as a geometry_msgs/PoseStamped message

Return type:

pose

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

class rl_environments.rx200.sim.robot_envs.rx200_robot_goal_sim_zed2.RX200RobotGoalEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_zed2=False)[source]

Bases: GazeboGoalEnv

Superclass for all RX200 Robot Goal environments.
  • Uses a ZED2 camera for RGB and Depth images

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /rx200/zed2/depth/depth_registered: Depth image from the ZED2 camera /rx200/zed2/left/image_rect_color: RGB image from the ZED2 camera

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot.

Parameters:
get_model_pose(model_name='red_cube')[source]

Get the pose of an object in Gazebo

Parameters:

model_name – name of the object whose pose is to be retrieved

Returns:

pose of the object as a geometry_msgs/PoseStamped message

Return type:

pose

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

Reach task envs (Kinect v2 vision)

class rl_environments.rx200.sim.task_envs.reach.rx200_kinect_reach_sim.RX200ReacherEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='Dense', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False)[source]

Bases: RX200RobotEnv

This Task env is for a simple Reach Task with the RX200 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

variables to keep track of ros, gazebo ports and gazebo pid

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

log_internal_state

Reward Architecture * Dense - Default * Sparse - -1 if not done and 1 if done * All the others and misspellings - default to “Dense” reward

simple_dense_reward

Use action as deltas

delta_coeff

Use smoothing for actions

action_cycle_time

Action speed - Time to complete the trajectory

action_speed

Observation space * RGB image only * traditional observations only (default) * RGB image and traditional observations (combined) * RGB image, Depth image and traditional observations (combined)

rgb_plus_depth_plus_normal_obs

Action space * Joint action space (default) * End effector action space

ee_action_type

Debug

debug

Load YAML param file

goal_space

Workspace so we can check if the action is within the workspace

workspace_space

Define subscribers/publishers and Markers as needed.

goal_marker

Init super class.

within_goal_space

Finished __init__ method

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.rx200.sim.task_envs.reach.rx200_kinect_reach_goal_sim.RX200ReacherGoalEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='sparse', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10.0, action_cycle_time=0.8, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, log_internal_state=False, action_speed=0.5, simple_dense_reward=True)[source]

Bases: RX200RobotGoalEnv

This Task env is for a simple Reach Task with the RX200 robot. This env is made to work in a real-time environment.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) default is 10 Hz - (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

variables to keep track of ros, gazebo ports and gazebo pid

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • log_internal_state (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

log_internal_state

Reward Architecture * Dense - Default * Sparse - -1 if not done and 1 if done * All the others and misspellings - default to “Dense” reward

simple_dense_reward

Use action as deltas

delta_coeff

Use smoothing for actions

action_cycle_time

Action speed - Time to complete the trajectory

action_speed

Observation space * RGB image only * traditional observations only (default) * RGB image and traditional observations (combined) * RGB image, Depth image and traditional observations (combined)

rgb_plus_depth_plus_normal_obs

Action space * Joint action space (default) * End effector action space

ee_action_type

Debug

debug

Load YAML param file

rgb_image_space

Achieved goal (EE pose) - 3

achieved_goal_space

Desired goal (Goal pose) - 3

desired_goal_space

Define the overall observation space

goal_space

Workspace so we can check if the action is within the workspace

workspace_space

Define subscribers/publishers and Markers as needed.

goal_marker

Init super class.

within_goal_space

Finished __init__ method

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Reach task envs (ZED 2 vision)

class rl_environments.rx200.sim.task_envs.reach.rx200_zed2_reach_sim.RX200ReacherEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='Dense', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False)[source]

Bases: RX200RobotEnv

This Task env is for a simple Reach Task with the RX200 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

variables to keep track of ros, gazebo ports and gazebo pid

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

log_internal_state

Reward Architecture * Dense - Default * Sparse - -1 if not done and 1 if done * All the others and misspellings - default to “Dense” reward

simple_dense_reward

Use action as deltas

delta_coeff

Use smoothing for actions

action_cycle_time

Action speed - Time to complete the trajectory

action_speed

Observation space * RGB image only * traditional observations only (default) * RGB image and traditional observations (combined) * RGB image, Depth image and traditional observations (combined)

rgb_plus_depth_plus_normal_obs

Action space * Joint action space (default) * End effector action space

ee_action_type

Debug

debug

Load YAML param file

goal_space

Workspace so we can check if the action is within the workspace

workspace_space

Define subscribers/publishers and Markers as needed.

goal_marker

Init super class.

within_goal_space

Finished __init__ method

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.rx200.sim.task_envs.reach.rx200_zed2_reach_goal_sim.RX200ReacherGoalEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='sparse', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.8, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, log_internal_state=False, action_speed=0.5, simple_dense_reward=True)[source]

Bases: RX200RobotGoalEnv

This Task env is for a simple Reach Task with the RX200 robot. This env is made to work in a real-time environment.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (5 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (28 obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) default is 10 Hz - (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

variables to keep track of ros, gazebo ports and gazebo pid

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • log_internal_state (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

log_internal_state

Reward Architecture * Dense - Default * Sparse - -1 if not done and 1 if done * All the others and misspellings - default to “Dense” reward

simple_dense_reward

Use action as deltas

delta_coeff

Use smoothing for actions

action_cycle_time

Action speed - Time to complete the trajectory

action_speed

Observation space * RGB image only * traditional observations only (default) * RGB image and traditional observations (combined) * RGB image, Depth image and traditional observations (combined)

rgb_plus_depth_plus_normal_obs

Action space * Joint action space (default) * End effector action space

ee_action_type

Debug

debug

Load YAML param file

rgb_image_space

Achieved goal (EE pose) - 3

achieved_goal_space

Desired goal (Goal pose) - 3

desired_goal_space

Define the overall observation space

goal_space

Workspace so we can check if the action is within the workspace

workspace_space

Define subscribers/publishers and Markers as needed.

goal_marker

Init super class.

within_goal_space

Finished __init__ method

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 8 05. Previous action - 5 or 3 (joint or ee) 06. Joint velocities - 8

total: (3x2) + 1 + (5 or 3) + (8x2) = 28 or 26

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

RX200 — Real hardware

Robot envs

class rl_environments.rx200.real.robot_envs.rx200_robot_real.RX200RobotEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False)[source]

Bases: RealBaseEnv

Superclass for all real RX200 Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot

Parameters:
ros_kin

Finished __init__ method

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

class rl_environments.rx200.real.robot_envs.rx200_robot_goal_real.RX200RobotGoalEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False)[source]

Bases: RealGoalEnv

Superclass for all real RX200 Robot environments. - For goal-conditioned tasks

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /rx200/arm_controller/command: Send the joint positions to the robot. /rx200/gripper_controller/command: Send the joint positions to the robot

Parameters:
ros_kin

Finished __init__ method

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the gripper joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the gripper

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

Niryo Ned2 — Simulation

Robot envs

class rl_environments.ned2.sim.robot_envs.ned2_robot_sim.NED2RobotEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_camera=False, gripper=False)[source]

Bases: GazeboBaseEnv

Superclass for all NED2 Robot environments - gazebo-based.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /gazebo_camera/image_raw: RGB image from the robot camera.

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /ned2/niryo_robot_follow_joint_trajectory_controller/command: Send the joint positions to the robot. /ned2/niryo_robot_tools_commander/action_server: Action server to control the robot tools.

Parameters:
ros_kin

Finished __init__ method

get_model_pose(model_name='red_cube', rpy=True)[source]

Get the pose of an object in Gazebo

Parameters:
  • model_name – name of the object whose pose is to be retrieved

  • rpy – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the pose is retrieved successful position: position of the object as a numpy array orientation: orientation of the object as a numpy array (rpy or quaternion)

Return type:

success

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(action)[source]

Set a joint position target only for the gripper joints using low-level ros controllers. - ros action server

Parameters:

action (str) – “open” or “close” the gripper

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

class rl_environments.ned2.sim.robot_envs.ned2_robot_goal_sim.NED2RobotGoalEnv(ros_port=None, gazebo_port=None, gazebo_pid=None, seed=None, real_time=False, action_cycle_time=0.0, load_cube=False, load_table=False, use_camera=False, gripper=False)[source]

Bases: GazeboGoalEnv

Superclass for all NED2 Robot Goal environments.

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /gazebo_camera/image_raw: RGB image from the robot camera.

Actuators Topic List:

MoveIt: Send the joint positions to the robot. /ned2/niryo_robot_follow_joint_trajectory_controller/command: Send the joint positions to the robot. /ned2/niryo_robot_tools_commander/action_server: Action server to control the robot tools.

Parameters:
ros_kin

Finished __init__ method

get_model_pose(model_name='red_cube')[source]

Get the pose of an object in Gazebo

Parameters:

model_name – name of the object whose pose is to be retrieved

Returns:

pose of the object as a geometry_msgs/PoseStamped message

Return type:

pose

spawn_cube_in_gazebo(model_pos_x, model_pos_y)[source]

Spawn a cube in Gazebo

Parameters:
  • model_pos_x – x-coordinate of the cube

  • model_pos_y – y-coordinate of the cube

Returns:

True if the cube is spawned successfully

Return type:

done

remove_cube_in_gazebo()[source]

Remove the cube from Gazebo

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(action)[source]

Set a joint position target only for the gripper joints using low-level ros controllers. - ros action server

Parameters:

action (str) – “open” or “close” the gripper

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints using moveit.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm using moveit.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

This gives us the best pose if we are using the moveit config of the ReactorX repo They are getting the pose with ee_gripper_link

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

Reach task envs

class rl_environments.ned2.sim.task_envs.reach.ned2_reach_sim.NED2ReacherEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='Dense', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10, action_cycle_time=0.1, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, action_speed=0.5, simple_dense_reward=True, log_internal_state=False, extra_smoothing=False)[source]

Bases: NED2RobotEnv

This Task env is for a simple Reach Task with the NED2 robot.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (simp obs or rgb/depth image or a combination)

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • extra_smoothing: Whether to use extra smoothing for actions or not.

variables to keep track of ros, gazebo ports and gazebo pid

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

  • log_internal_state (bool)

  • extra_smoothing (bool)

log_internal_state

Reward Architecture * Dense - Default * Sparse - -1 if not done and 1 if done * All the others and misspellings - default to “Dense” reward

simple_dense_reward

Use action as deltas

delta_coeff

Use smoothing for actions

action_cycle_time

Action speed - Time to complete the trajectory

action_speed

Observation space * RGB image only * traditional observations only (default) * RGB image and traditional observations (combined) * RGB image, Depth image and traditional observations (combined)

rgb_plus_depth_plus_normal_obs

Action space * Joint action space (default) * End effector action space

ee_action_type

Debug

debug

Load YAML param file

goal_space

Workspace so we can check if the action is within the workspace

workspace_space

Define subscribers/publishers and Markers as needed.

goal_marker

Init super class.

within_goal_space

Finished __init__ method

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 6 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 6

total: (3x2) + 1 + (6 or 3) + (6x2)

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward()[source]

Function to get a reward from the environment.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

and as always, negative rewards for each step, non-execution and actions not within joint limits

Returns:

A scalar reward value representing how well the agent is doing in the current episode.

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

class rl_environments.ned2.sim.task_envs.reach.ned2_reach_goal_sim.NED2ReacherGoalEnv(launch_gazebo=True, new_roscore=True, roscore_port=None, gazebo_paused=False, gazebo_gui=False, seed=None, reward_type='sparse', delta_action=True, delta_coeff=0.05, ee_action_type=False, environment_loop_rate=10.0, action_cycle_time=0.8, use_smoothing=False, rgb_obs_only=False, normal_obs_only=True, rgb_plus_normal_obs=False, rgb_plus_depth_plus_normal_obs=False, load_table=True, debug=False, log_internal_state=False, action_speed=0.5, simple_dense_reward=True)[source]

Bases: NED2RobotGoalEnv

This Task env is for a simple Reach Task with the NED2 robot. This env is made to work in a real-time environment.

The task is done if
  • The robot reached the goal

Here
  • Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)

  • Observation - Continuous (simp obs or rgb/depth image or a combination)

  • Desired Goal - Goal we are trying to reach

  • Achieved Goal - Position of the EE

Init Args:
  • launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running.

  • new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.

  • roscore_port: Port of the roscore to be launched. If None, a random port is chosen.

  • gazebo_paused: Whether to launch Gazebo in a paused state or not.

  • gazebo_gui: Whether to launch Gazebo with the GUI or not.

  • seed: Seed for the random number generator.

  • reward_type: Type of reward to be used. It Can be “Sparse” or “Dense”.

  • delta_action: Whether to use the delta actions or the absolute actions.

  • delta_coeff: Coefficient to be used for the delta actions.

  • ee_action_type: Whether to use the end effector action space or the joint action space.

  • environment_loop_rate: Rate at which the environment should run. (in Hz) default is 10 Hz - (default operating frequency of the robot)

  • action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time “1/environment_loop_rate”)

  • use_smoothing: Whether to use smoothing for actions or not.

  • rgb_obs_only: Whether to use only the RGB image as the observations or not.

  • normal_obs_only: Whether to use only the traditional observations or not.

  • rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.

  • rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.

  • load_table: Whether to load the table model or not.

  • debug: Whether to print debug messages or not.

  • log_internal_state: Whether to log the internal state of the environment or not.

  • action_speed: set the speed to complete the trajectory. default in 0.5 seconds

  • simple_dense_reward: Whether to use a simple dense reward or not.

variables to keep track of ros, gazebo ports and gazebo pid

Parameters:
  • launch_gazebo (bool)

  • new_roscore (bool)

  • roscore_port (str)

  • gazebo_paused (bool)

  • gazebo_gui (bool)

  • seed (int)

  • reward_type (str)

  • delta_action (bool)

  • delta_coeff (float)

  • ee_action_type (bool)

  • environment_loop_rate (float)

  • action_cycle_time (float)

  • use_smoothing (bool)

  • rgb_obs_only (bool)

  • normal_obs_only (bool)

  • rgb_plus_normal_obs (bool)

  • rgb_plus_depth_plus_normal_obs (bool)

  • load_table (bool)

  • debug (bool)

  • log_internal_state (bool)

  • action_speed (float)

  • simple_dense_reward (bool)

log_internal_state

Reward Architecture * Dense - Default * Sparse - -1 if not done and 1 if done * All the others and misspellings - default to “Dense” reward

simple_dense_reward

Use action as deltas

delta_coeff

Use smoothing for actions

action_cycle_time

Action speed - Time to complete the trajectory

action_speed

Observation space * RGB image only * traditional observations only (default) * RGB image and traditional observations (combined) * RGB image, Depth image and traditional observations (combined)

rgb_plus_depth_plus_normal_obs

Action space * Joint action space (default) * End effector action space

ee_action_type

Debug

debug

Load YAML param file

rgb_image_space

Achieved goal (EE pose) - 3

achieved_goal_space

Desired goal (Goal pose) - 3

desired_goal_space

Define the overall observation space

goal_space

Workspace so we can check if the action is within the workspace

workspace_space

Define subscribers/publishers and Markers as needed.

goal_marker

Init super class.

within_goal_space

Finished __init__ method

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for achieving a given goal.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict|list) – Additional information about the environment. A list for SB3 HER case.

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

compute_terminated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is terminated.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

compute_truncated(achieved_goal, desired_goal, info)[source]

Function to check if the episode is truncated.

Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.

Parameters:
  • achieved_goal – EE position

  • desired_goal – Reach Goal

  • info (dict) – Additional information about the environment.

Returns:

A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached)

environment_loop(event)[source]

Function for Environment loop for real time RL envs

execute_action(action)[source]

Function to apply an action to the robot.

This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot.

Parameters:

action – The action to be applied to the robot.

sample_observation()[source]

Function to get an observation from the environment.

# traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 6 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 6

total: (3x2) + 1 + (6 or 3) + (6x2)

# depth image 480x640 32FC1

# rgb image 480x640X3 RGB images

Returns:

An observation representing the current state of the environment.

calculate_reward(achieved_goal=None, desired_goal=None, info=None)[source]

Compute the reward for achieving a given goal.

Sparse Reward: float => 1.0 for success, -1.0 for failure

Dense Reward:

if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal

And as always negative rewards for each step, non-execution and actions not within joint limits

Parameters:
  • achieved_goal – EE position (optional)

  • desired_goal – Reach Goal (optional)

  • info (dict) – Additional information about the environment. (Optional)

Returns:

The reward for achieving the given goal.

Return type:

reward (float)

check_if_done()[source]

Function to check if the episode is done.

The Task is done if the EE is close enough to the goal

Returns:

A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered)

check_if_reach_done(achieved_goal, desired_goal)[source]

Check if the reach is done

test_goal_pos(goal)[source]

Function to check if the given goal is reachable

get_random_goal(max_tries=100)[source]

Function to get a reachable goal

Parameters:

max_tries (int)

get_random_goal_no_check()[source]

Function to get a random goal without checking

check_action_within_goal_space_fk(action)[source]

Function to check if the given action is within the goal space

check_action_within_workspace(action)[source]

Function to check if the given action is within the workspace

Parameters:

action – The action to be applied to the robot.

Returns:

A boolean value indicating whether the action is within the workspace

check_if_z_within_limits(action)[source]

Function to check if the given ee_pos is within the limits

Niryo Ned2 — Real hardware

class rl_environments.ned2.real.robot_envs.ned2_robot_real.NED2RobotEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False, remote_ip=None, local_ip=None, multi_device_mode=True)[source]

Bases: RealBaseEnv

Superclass for all real NED2 Robot environments.

Initializes a new Robot Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /niryo_robot_follow_joint_trajectory_controller/command: Send the joint positions to the robot. /niryo_robot_tools_commander/action_server: Send the joint positions to the robot gripper.

Parameters:
  • ros_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

  • remote_ip (str)

  • local_ip (str)

  • multi_device_mode (bool)

ros_kin

Finished __init__ method

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(action)[source]

Set a joint position target only for the gripper joints using low-level ros controllers. - ros action server

Parameters:

action (str) – “open” or “close” the gripper

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor

class rl_environments.ned2.real.robot_envs.ned2_robot_goal_real.NED2RobotGoalEnv(ros_port=None, seed=None, close_env_prompt=False, action_cycle_time=0.0, use_kinect=False, use_zed2=False, remote_ip=None, local_ip=None, multi_device_mode=True)[source]

Bases: RealGoalEnv

Superclass for all real NED2 Robot environments. - For goal-conditioned tasks

Initializes a new Robot Goal Environment

Describe the robot and the sensors used in the env.

Sensor Topic List:

MoveIt: To get the pose and rpy of the robot. /joint_states: JointState received for the joints of the robot /head_mount_kinect2/depth/image_raw: Depth image from the kinect sensor /head_mount_kinect2/rgb/image_raw: RGB image from the kinect sensor

Actuators Topic List:

MoveIt!: Send the joint positions to the robot. /niryo_robot_follow_joint_trajectory_controller/command: Send the joint positions to the robot. /niryo_robot_tools_commander/action_server: Send the joint positions to the robot gripper.

Parameters:
  • ros_port (str)

  • seed (int)

  • close_env_prompt (bool)

  • use_kinect (bool)

  • use_zed2 (bool)

  • remote_ip (str)

  • local_ip (str)

  • multi_device_mode (bool)

ros_kin

Finished __init__ method

fk_pykdl(action)[source]

Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.

Parameters:

action – joint positions of the robot arm (in radians)

Returns:

end-effector position as a numpy array

Return type:

ee_position

calculate_fk(joint_positions, euler=True)[source]

Calculate the forward kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • joint_positions – joint positions of the robot arm (in radians)

  • euler – True if the orientation is to be returned as euler angles (default: True)

Returns:

True if the FK calculation is successful ee_position: end-effector position as a numpy array ee_rpy: end-effector orientation as a list of rpy or quaternion values

Return type:

done

calculate_ik(target_pos, ee_ori=array([0., 0., 0., 1.]))[source]

Calculate the inverse kinematics of the robot arm using the ros_kinematics package.

Parameters:
  • target_pos – target end-effector position as a numpy array

  • ee_ori – end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])

Returns:

True if the IK calculation is successful joint_positions: joint positions of the robot arm (in radians)

Return type:

done

joint_state_callback(joint_state)[source]

Function to get the joint state of the robot.

move_arm_joints(q_positions, time_from_start=0.5)[source]

Set a joint position target only for the arm joints using low-level ros controllers.

Parameters:
  • q_positions (ndarray) – joint positions of the robot arm

  • time_from_start (float) – time from start of the trajectory (set the speed to complete the trajectory)

Returns:

True if the action is successful

Return type:

bool

move_gripper_joints(action)[source]

Set a joint position target only for the gripper joints using low-level ros controllers. - ros action server

Parameters:

action (str) – “open” or “close” the gripper

Returns:

True if the action is successful

Return type:

bool

smooth_trajectory(q_positions, time_from_start, multiplier=100)[source]

Smooth the trajectory by interpolating between the current and target positions.

Parameters:
  • q_positions – target joint positions

  • time_from_start – time from start of the trajectory (set the speed to complete the trajectory)

  • multiplier – number of steps to interpolate between the current and target positions

publish_trajectory(trajectory_points)[source]

Publish the entire trajectory at once.

Parameters:

trajectory_points – List of tuples containing joint positions and time_from_start

set_trajectory_joints(q_positions)[source]

Set a joint position target only for the arm joints.

Parameters:

q_positions (ndarray)

Return type:

bool

set_trajectory_ee(pos)[source]

Set a pose target for the end effector of the robot arm.

Parameters:

pos (ndarray)

Return type:

bool

get_ee_pose()[source]

Returns the end-effector pose as a geometry_msgs/PoseStamped message

get_ee_rpy()[source]

Returns the end-effector orientation as a list of roll, pitch, and yaw angles.

get_joint_angles()[source]

get current joint angles of the robot arm - 5 elements Returns a list

check_goal(goal)[source]

Check if the goal is reachable

check_goal_reachable_joint_pos(joint_pos)[source]

Check if the goal is reachable with joint positions

kinect_depth_callback(data)[source]

Callback function for kinect depth sensor

kinect_rgb_callback(img_msg)[source]

Callback function for kinect rgb sensor

zed2_depth_callback(data)[source]

Callback function for zed2 depth sensor

zed2_rgb_callback(img_msg)[source]

Callback function for zed2 rgb sensor