Source code for rl_environments.ned2.sim.task_envs.reach.ned2_reach_sim

#!/bin/python3

from typing import Any, Optional, Dict

import rospy
import numpy as np
from gymnasium import spaces
from gymnasium.envs.registration import register
import scipy.spatial

# Custom robot env
from rl_environments.ned2.sim.robot_envs import ned2_robot_sim

# core modules of the framework
from multiros.utils import gazebo_core
from multiros.utils import gazebo_models
from multiros.utils import gazebo_physics
from multiros.utils.moveit_multiros import MoveitMultiros
from multiros.utils import ros_common
from multiros.utils import ros_controllers
from multiros.utils import ros_markers

# Register your environment using the gymnasium register method to utilize gym.make("TaskEnv-v0").
# register(
#     id='NED2ReacherSim-v2',
#     entry_point='rl_environments.ned2.sim.task_envs.reach.ned2_reach_sim:NED2ReacherEnv',
#     max_episode_steps=1000,
# )


[docs] class NED2ReacherEnv(ned2_robot_sim.NED2RobotEnv): """ This Task env is for a simple Reach Task with the NED2 robot. The task is done if * The robot reached the goal Here * Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector) * Observation - Continuous (simp obs or rgb/depth image or a combination) Init Args: * launch_gazebo: Whether to launch Gazebo or not. If False, it is assumed that Gazebo is already running. * new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running. * roscore_port: Port of the roscore to be launched. If None, a random port is chosen. * gazebo_paused: Whether to launch Gazebo in a paused state or not. * gazebo_gui: Whether to launch Gazebo with the GUI or not. * seed: Seed for the random number generator. * reward_type: Type of reward to be used. It Can be "Sparse" or "Dense". * delta_action: Whether to use the delta actions or the absolute actions. * delta_coeff: Coefficient to be used for the delta actions. * ee_action_type: Whether to use the end effector action space or the joint action space. * environment_loop_rate: Rate at which the environment should run. (in Hz) - default 10 Hz (default operating frequency of the robot) * action_cycle_time: Time to wait between two consecutive actions. (in seconds) - default 100 ms (should be equal to larger than the environment loop time "1/environment_loop_rate") * use_smoothing: Whether to use smoothing for actions or not. * rgb_obs_only: Whether to use only the RGB image as the observations or not. * normal_obs_only: Whether to use only the traditional observations or not. * rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not. * rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not. * load_table: Whether to load the table model or not. * debug: Whether to print debug messages or not. * action_speed: set the speed to complete the trajectory. default in 0.5 seconds * simple_dense_reward: Whether to use a simple dense reward or not. * log_internal_state: Whether to log the internal state of the environment or not. * extra_smoothing: Whether to use extra smoothing for actions or not. """ def __init__(self, launch_gazebo: bool = True, new_roscore: bool = True, roscore_port: str = None, gazebo_paused: bool = False, gazebo_gui: bool = False, seed: int = None, reward_type: str = "Dense", delta_action: bool = True, delta_coeff: float = 0.05, ee_action_type: bool = False, environment_loop_rate: float = 10, action_cycle_time: float = 0.100, use_smoothing: bool = False, rgb_obs_only: bool = False, normal_obs_only: bool = True, rgb_plus_normal_obs: bool = False, rgb_plus_depth_plus_normal_obs: bool = False, load_table: bool = True, debug: bool = False, action_speed: float = 0.5, simple_dense_reward: bool = True, log_internal_state: bool = False, extra_smoothing: bool = False): """ variables to keep track of ros, gazebo ports and gazebo pid """ ros_port = None gazebo_port = None gazebo_pid = None """ Initialise the env It is recommended to launch Gazebo with a new roscore at this point for the following reasons:, 1. This allows running a new rosmaster to enable vectorisation of the environment and the execution of multiple environments concurrently. 2. The environment can keep track of the process ID of Gazebo to automatically close it when env.close() is called. """ # launch gazebo if launch_gazebo: # Update the function to include additional options. ros_port, gazebo_port, gazebo_pid = self._launch_gazebo(launch_roscore=new_roscore, port=roscore_port, paused=gazebo_paused, gui=gazebo_gui) # Launch new roscore elif new_roscore: ros_port = self._launch_roscore(port=roscore_port) # ros_port of the already running roscore elif roscore_port is not None: ros_port = roscore_port # change to new rosmaster ros_common.change_ros_master(ros_port) else: """ Check for roscore """ if ros_common.is_roscore_running() is False: print("roscore is not running! Launching a new roscore and Gazebo!") ros_port, gazebo_port, gazebo_pid = gazebo_core.launch_gazebo(launch_roscore=new_roscore, port=roscore_port, paused=gazebo_paused, gui=gazebo_gui) # init the ros node if ros_port is not None: self.node_name = "NED2ReacherEnvSim" + "_" + ros_port else: self.node_name = "NED2ReacherEnvSim" rospy.init_node(self.node_name, anonymous=True) """ Provide a description of the task. """ rospy.loginfo(f"Starting {self.node_name}") """ Exit the program if - (1/environment_loop_rate) is greater than action_cycle_time """ if (1.0 / environment_loop_rate) > action_cycle_time: rospy.logerr("The environment loop rate is greater than the action cycle time. Exiting the program!") rospy.signal_shutdown("Exiting the program!") exit() """ log internal state - using rospy loginfo, logwarn, logerr """ self.log_internal_state = log_internal_state """ Reward Architecture * Dense - Default * Sparse - -1 if not done and 1 if done * All the others and misspellings - default to "Dense" reward """ if reward_type.lower() == "sparse": self.reward_arc = "Sparse" elif reward_type.lower() == "dense": self.reward_arc = "Dense" else: rospy.logwarn(f"The given reward architecture '{reward_type}' not found. Defaulting to Dense!") self.reward_arc = "Dense" # check if we are using simple dense reward # for simple, we only return the distance to the goal (negative Euclidean distance = -d) self.simple_dense_reward = simple_dense_reward """ Use action as deltas """ self.delta_action = delta_action self.delta_coeff = delta_coeff """ Use smoothing for actions """ self.use_smoothing = use_smoothing self.extra_smoothing = extra_smoothing self.action_cycle_time = action_cycle_time """ Action speed - Time to complete the trajectory """ self.action_speed = action_speed """ Observation space * RGB image only * traditional observations only (default) * RGB image and traditional observations (combined) * RGB image, Depth image and traditional observations (combined) """ self.rgb_obs = rgb_obs_only self.normal_obs = normal_obs_only self.rgb_plus_normal_obs = rgb_plus_normal_obs self.rgb_plus_depth_plus_normal_obs = rgb_plus_depth_plus_normal_obs """ Action space * Joint action space (default) * End effector action space """ self.ee_action_type = ee_action_type """ Debug """ self.debug = debug """ Load YAML param file """ # add to ros parameter server ros_common.ros_load_yaml(pkg_name="rl_environments", file_name="ned2_reach_task_config.yaml", ns="/") self._get_params() """ Define the action space. """ # Joint action space or End effector action space # ROS and Gazebo often use double-precision (64-bit), # but we are using single-precision (32-bit) as it is typical for RL implementations. if self.ee_action_type: # EE action space self.max_ee_values = np.array([self.position_ee_max["x"], self.position_ee_max["y"], self.position_ee_max["z"]]) self.min_ee_values = np.array([self.position_ee_min["x"], self.position_ee_min["y"], self.position_ee_min["z"]]) self.action_space = spaces.Box(low=np.array(self.min_ee_values), high=np.array(self.max_ee_values), dtype=np.float32) else: # Joint action space self.action_space = spaces.Box(low=np.array(self.min_joint_values), high=np.array(self.max_joint_values), dtype=np.float32) """ Define the observation space. # typical observation 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 6 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 6 total: (3x2) + 1 + (6 or 3) + (6x2) # depth image 480x640 32FC1 # rgb image 480x640X3 RGB images """ # ---- ee pos observations_high_ee_pos_range = np.array( np.array([self.position_ee_max["x"], self.position_ee_max["y"], self.position_ee_max["z"]])) observations_low_ee_pos_range = np.array( np.array([self.position_ee_min["x"], self.position_ee_min["y"], self.position_ee_min["z"]])) # ---- vector to the goal - normalized linear distance observations_high_vec_ee_goal = np.array([1.0, 1.0, 1.0]) observations_low_vec_ee_goal = np.array([-1.0, -1.0, -1.0]) # ---- Euclidian distance observations_high_dist = np.array([self.max_distance]) observations_low_dist = np.array([0.0]) # ---- joint values observations_high_joint_values = self.max_joint_angles.copy() observations_low_joint_values = self.min_joint_angles.copy() # ---- previous action if self.ee_action_type: observations_high_prev_action = self.max_ee_values.copy() observations_low_prev_action = self.min_ee_values.copy() else: observations_high_prev_action = self.max_joint_values.copy() observations_low_prev_action = self.min_joint_values.copy() # ---- joint velocities observations_high_joint_vel = self.max_joint_vel.copy() observations_low_joint_vel = self.min_joint_vel.copy() high = np.concatenate( [observations_high_ee_pos_range, observations_high_vec_ee_goal, observations_high_dist, observations_high_joint_values, observations_high_prev_action, observations_high_joint_vel, ]) low = np.concatenate( [observations_low_ee_pos_range, observations_low_vec_ee_goal, observations_low_dist, observations_low_joint_values, observations_low_prev_action, observations_low_joint_vel, ]) # Define the traditional observation space self.observations = spaces.Box(low=low, high=high, dtype=np.float32) # Define the depth image space (480x640 32FC1) - this uses 32-bit float self.depth_image_space = spaces.Box(low=0, high=1, shape=(480, 640), dtype=np.float32) # Define the image space (480x640X3 RGB images) - this uses 8-bit unsigned int self.rgb_image_space = spaces.Box(low=0, high=255, shape=(480, 640, 3), dtype=np.uint8) # Define the final observation space if self.normal_obs: use_kinect = False # to pass to the superclass self.observation_space = self.observations elif self.rgb_obs: use_kinect = True self.observation_space = self.rgb_image_space elif self.rgb_plus_normal_obs: use_kinect = True # Define a combined observation space self.observation_space = spaces.Dict({ "rgb_image": self.rgb_image_space, "observations": self.observations }) elif self.rgb_plus_depth_plus_normal_obs: use_kinect = True # Define a combined observation space self.observation_space = spaces.Dict({ "depth_image": self.depth_image_space, "rgb_image": self.rgb_image_space, "observations": self.observations }) # if none of the above, use the traditional observation space else: use_kinect = False self.observation_space = self.observations """ Goal space for sampling """ # ---- Goal pos high_goal_pos_range = np.array( np.array([self.position_goal_max["x"], self.position_goal_max["y"], self.position_goal_max["z"]])) low_goal_pos_range = np.array( np.array([self.position_goal_min["x"], self.position_goal_min["y"], self.position_goal_min["z"]])) # -- goal space for sampling self.goal_space = spaces.Box(low=low_goal_pos_range, high=high_goal_pos_range, dtype=np.float32, seed=seed) """ Workspace so we can check if the action is within the workspace """ # ---- Workspace high_workspace_range = np.array( np.array([self.workspace_max["x"], self.workspace_max["y"], self.workspace_max["z"]])) low_workspace_range = np.array( np.array([self.workspace_min["x"], self.workspace_min["y"], self.workspace_min["z"]])) # -- workspace space for checking # we don't need to set the seed here since we're not sampling from this space self.workspace_space = spaces.Box(low=low_workspace_range, high=high_workspace_range, dtype=np.float32) """ Define subscribers/publishers and Markers as needed. """ self.goal_marker = ros_markers.RosMarker(frame_id="world", ns="goal", marker_type=2, marker_topic="goal_pos", lifetime=20.0) """ Init super class. """ super().__init__(ros_port=ros_port, gazebo_port=gazebo_port, gazebo_pid=gazebo_pid, seed=seed, real_time=True, action_cycle_time=action_cycle_time, use_camera=use_kinect, load_table=load_table) # for smoothing if self.extra_smoothing: if self.ee_action_type: self.action_vector = np.zeros(3, dtype=np.float32) else: self.action_vector = np.zeros(6, dtype=np.float32) # we can use this to set a time for ros_controllers to complete the action self.environment_loop_time = 1.0 / environment_loop_rate # in seconds self.prev_action = None # for observation if environment_loop_rate is not None: self.obs_r = None self.reward_r = None self.terminated_r = False self.truncated_r = False self.info_r = {} self.current_action = None self.init_done = False # we don't need to execute the loop until we reset the env # Debug if self.debug: self.loop_counter = 0 self.action_counter = 0 # create a timer to run the environment loop rospy.Timer(rospy.Duration(1.0 / environment_loop_rate), self.environment_loop) # for dense reward calculation self.action_not_in_limits = False self.lowest_z = self.position_goal_min["z"] self.movement_result = False self.within_goal_space = False """ Finished __init__ method """ rospy.loginfo(f"Finished Init of {self.node_name}") # ------------------------------------------------------- # Methods for interacting with the environment def _set_init_params(self, options: Optional[Dict[str, Any]] = None): """ Set initial parameters for the environment. Here we 1. Move the Robot to Home position 2. Find a valid random reach goal """ if self.log_internal_state: rospy.loginfo("Initialising the init params!") # Initial robot pose - Home self.init_pos = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0], dtype=np.float32) # make the current action None to stop execution for real time envs and also stop the env loop self.init_done = False # we don't need to execute the loop until we reset the env self.current_action = None # for smoothing if self.extra_smoothing: if self.ee_action_type: self.action_vector = np.zeros(3, dtype=np.float32) else: self.action_vector = np.zeros(6, dtype=np.float32) # move the robot to the home pose # we need to wait for the movement to finish # we define the movement result here so that we can use it in the environment loop (we need it for dense reward) self.move_NED2_object.stop_arm() self.movement_result = self.move_NED2_object.set_trajectory_joints(self.init_pos) if not self.movement_result: if self.log_internal_state: rospy.logwarn("Homing failed!") # Get a random Reach goal - np.array # goal_found, goal_vector = self.get_random_goal() # this checks if the goal is reachable using moveit goal_found, goal_vector = self.get_random_goal_no_check() if goal_found: self.reach_goal = goal_vector if self.log_internal_state: rospy.loginfo("Reach Goal--->" + str(self.reach_goal)) else: # fake Reach goal - hard code one self.reach_goal = np.array([0.250, 0.000, 0.015], dtype=np.float32) if self.log_internal_state: rospy.logwarn("Hard Coded Reach Goal--->" + str(self.reach_goal)) # Publish the goal pos self.goal_marker.set_position(position=self.reach_goal) self.goal_marker.publish() # get initial ee pos and joint values (we need this for delta actions or when we have EE action space) ee_pos_tmp = self.get_ee_pose() # Get a geometry_msgs/PoseStamped msg self.ee_pos = np.array([ee_pos_tmp.pose.position.x, ee_pos_tmp.pose.position.y, ee_pos_tmp.pose.position.z]) self.ee_ori = np.array([ee_pos_tmp.pose.orientation.x, ee_pos_tmp.pose.orientation.y, ee_pos_tmp.pose.orientation.z, ee_pos_tmp.pose.orientation.w]) # for IK calculation - EE actions self.joint_values = self.get_joint_angles() # for dense reward calculation self.action_not_in_limits = False self.within_goal_space = True if self.ee_action_type: self.prev_action = self.ee_pos.copy() # for observation else: self.prev_action = self.init_pos.copy() # for observation # We can start the environment loop now if self.log_internal_state: rospy.loginfo("Start resetting the env loop!") # init the real time variables self.obs_r = None self.reward_r = None self.terminated_r = False self.truncated_r = False self.info_r = {} # debug if self.debug: self.loop_counter = 0 self.action_counter = 0 if self.log_internal_state: rospy.loginfo("Done resetting the env loop!") self.init_done = True # self.current_action = self.init_pos.copy() if self.log_internal_state: rospy.loginfo("Initialising init params done--->") def _set_action(self, action): """ Function to apply an action to the robot. Args: action: The action to be applied to the robot. """ # save the action for observation self.prev_action = action.copy() if self.log_internal_state: rospy.loginfo(f"Applying real-time action---> {action}") self.current_action = action.copy() # for debugging if self.debug: self.action_counter = 0 # reset the action counter def _get_observation(self): """ Function to get an observation from the environment. Returns: An observation representing the current state of the environment. """ obs = None # we cannot copy a None value if self.obs_r is not None: obs = self.obs_r.copy() # incase we don't have an observation yet if obs is None: obs = self.sample_observation() return obs.copy() def _get_reward(self, info: Optional[Dict[str, Any]] = None): """ Function to get a reward from the environment. Returns: A scalar reward value representing how well the agent is doing in the current episode. """ reward = None if self.reward_r is not None: reward = self.reward_r # incase we don't have a reward yet if reward is None: reward = self.calculate_reward() return reward def _compute_terminated(self, info: Optional[Dict[str, Any]] = None): """ Function to check if the episode is terminated. Returns: A boolean value indicating whether the episode has ended (e.g. because a goal has been reached or a failure condition has been triggered) """ terminated = self.terminated_r self.info = self.info_r # we can use this to log the success rate in stable baselines3 and other packages # check if self.info have the is_success key # double-checking here if "is_success" not in self.info: if terminated: self.info["is_success"] = True else: self.info["is_success"] = False # incase we don't have a done yet for real time envs # unnecessary to check since we never set the terminated to None if terminated is None: terminated = self.check_if_done() if terminated: self.info["is_success"] = True # explicitly set the success rate else: self.info["is_success"] = False return terminated def _compute_truncated(self, info: Optional[Dict[str, Any]] = None): """ Function to check if the episode is truncated. Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode. Returns: A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached) """ truncated = self.truncated_r return truncated # ------------------------------------------------------- # Include any custom methods available for the MyTaskEnv class
[docs] def environment_loop(self, event): """ Function for Environment loop for real time RL envs """ # we don't need to execute the loop until we reset the env if self.init_done: if self.debug: if self.log_internal_state: rospy.loginfo(f"Starting RL loop --->: {self.loop_counter}") self.loop_counter += 1 # start with the observation, reward, done and info self.info_r = {} self.obs_r = self.sample_observation() self.reward_r = self.calculate_reward() self.terminated_r = self.check_if_done() # Apply the action # we need this if we're done with the task we can break the loop in above done check if self.current_action is not None: self.execute_action(self.current_action) if self.debug: if self.log_internal_state: rospy.loginfo(f"Executing action --->: {self.action_counter}") self.action_counter += 1 else: self.move_NED2_object.stop_arm() # stop the arm if there is no action
[docs] def execute_action(self, action): """ Function to apply an action to the robot. This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot. Args: action: The action to be applied to the robot. """ if self.log_internal_state: rospy.loginfo(f"Action --->: {action}") # --- Set the action based on the action type # --- EE action if self.ee_action_type: # --- Get the current EE position ee_pos_tmp = self.get_ee_pose() # Get a geometry_msgs/PoseStamped msg self.ee_pos = np.array([ee_pos_tmp.pose.position.x, ee_pos_tmp.pose.position.y, ee_pos_tmp.pose.position.z]) # --- Make actions as deltas if self.delta_action: # we can use smoothing using the action_cycle_time or delta_coeff if self.extra_smoothing: if self.action_cycle_time == 0.0: # first derivative of the action self.action_vector = self.action_vector + (self.delta_coeff * action) # clip the action vector to be within -1 and 1 self.action_vector = np.clip(self.action_vector, -1, 1) # add the action vector to the current ee pos action = self.ee_pos + (self.action_vector * self.delta_coeff) else: # first derivative of the action self.action_vector = self.action_vector + (self.action_cycle_time * action) # clip the action vector to be within -1 and 1 self.action_vector = np.clip(self.action_vector, -1, 1) # add the action vector to the current ee pos action = self.ee_pos + (self.action_vector * self.action_cycle_time) else: action = self.ee_pos + (action * self.delta_coeff) # check if the action is within the joint limits (for the reward calculation) min_ee_values = np.array(self.min_ee_values) max_ee_values = np.array(self.max_ee_values) self.action_not_in_limits = np.any(action <= (min_ee_values + 0.0001)) or np.any( action >= (max_ee_values - 0.0001)) # clip the action action = np.clip(action, self.min_ee_values, self.max_ee_values) # check if we can reach the goal and within the goal space # check if action is within the workspace if self.workspace_space.contains(action): # calculate IK IK_found, joint_positions = self.calculate_ik(target_pos=action, ee_ori=self.ee_ori) if IK_found: # we can use simple smoothing or direct trajectory execution if self.use_smoothing: self.movement_result = self.smooth_trajectory(joint_positions, self.action_speed) else: # execute the trajectory - EE self.movement_result = self.move_arm_joints(q_positions=joint_positions, time_from_start=self.action_speed) # for dense reward calculation self.within_goal_space = True else: if self.log_internal_state: rospy.logwarn(f"The action: {action} is not reachable!") rospy.logdebug(f"Set action failed for --->: {action}") self.movement_result = False self.within_goal_space = False else: if self.log_internal_state: rospy.logdebug(f"Set action failed for --->: {action}") self.movement_result = False self.within_goal_space = False # --- Joint action else: # --- Make actions as deltas if self.delta_action: # get the current joint values self.joint_values = self.get_joint_angles() # we can use smoothing using the action_cycle_time or delta_coeff if self.extra_smoothing: if self.action_cycle_time == 0.0: # first derivative of the action self.action_vector = self.action_vector + (self.delta_coeff * action) # clip the action vector to be within -1 and 1 self.action_vector = np.clip(self.action_vector, -1, 1) # add the action vector to the current ee pos action = self.joint_values + (self.action_vector * self.delta_coeff) else: # first derivative of the action self.action_vector = self.action_vector + (self.action_cycle_time * action) # clip the action vector to be within -1 and 1 self.action_vector = np.clip(self.action_vector, -1, 1) # add the action vector to the current ee pos action = self.joint_values + (self.action_vector * self.action_cycle_time) else: action = self.joint_values + (action * self.delta_coeff) # check if the action is within the joint limits (for the reward calculation) min_joint_values = np.array(self.min_joint_values) max_joint_values = np.array(self.max_joint_values) self.action_not_in_limits = np.any(action <= (min_joint_values + 0.0001)) or np.any( action >= (max_joint_values - 0.0001)) # clip the action if self.debug: if self.log_internal_state: rospy.logwarn(f"Action + current joint_values before clip --->: {action}") action = np.clip(action, self.min_joint_values, self.max_joint_values) if self.debug: if self.log_internal_state: rospy.logwarn(f"Action + current joint_values after clip --->: {action}") # check if the action is within the workspace if self.check_action_within_workspace(action): # we can use simple smoothing or direct trajectory execution if self.use_smoothing: self.movement_result = self.smooth_trajectory(action, self.action_speed) else: # execute the trajectory - ros_controllers self.movement_result = self.move_arm_joints(q_positions=action, time_from_start=self.action_speed) # for dense reward calculation self.within_goal_space = True else: if self.log_internal_state: rospy.logdebug(f"Set action failed for --->: {action}") self.movement_result = False self.within_goal_space = False
[docs] def sample_observation(self): """ Function to get an observation from the environment. # traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 6 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 6 total: (3x2) + 1 + (6 or 3) + (6x2) # depth image 480x640 32FC1 # rgb image 480x640X3 RGB images Returns: An observation representing the current state of the environment. """ current_goal = self.reach_goal # --- 1. Get EE position ee_pos_tmp = self.get_ee_pose() # Get a geometry_msgs/PoseStamped msg self.ee_pos = np.array([ee_pos_tmp.pose.position.x, ee_pos_tmp.pose.position.y, ee_pos_tmp.pose.position.z]) self.ee_ori = np.array([ee_pos_tmp.pose.orientation.x, ee_pos_tmp.pose.orientation.y, ee_pos_tmp.pose.orientation.z, ee_pos_tmp.pose.orientation.w]) # --- Linear distance to the goal linear_dist_ee_goal = current_goal - self.ee_pos # goal is box dtype and ee_pos is numpy.array. It is okay # --- 2. Vector to goal (we are giving only the direction vector) vec_ee_goal = linear_dist_ee_goal / np.linalg.norm(linear_dist_ee_goal) # --- 3. Euclidian distance euclidean_distance_ee_goal = scipy.spatial.distance.euclidean(self.ee_pos, current_goal) # float # --- Get Current Joint values - only for the joints we are using # we need this for delta actions # self.joint_values = self.current_joint_positions.copy() # Get a float list self.joint_values = self.get_joint_angles() # Get a float list # we don't need to convert this to numpy array since we concat using numpy below if self.prev_action is None: # we can use the ee_pos as the previous action - for EE action type if self.ee_action_type: prev_action = self.ee_pos # we can use the joint values as the previous action - for Joint action type else: prev_action = self.joint_values.copy() else: prev_action = self.prev_action.copy() # our observations obs = np.concatenate((self.ee_pos, vec_ee_goal, euclidean_distance_ee_goal, self.joint_pos_all, prev_action, self.current_joint_velocities), axis=None, dtype=np.float32) if self.log_internal_state: rospy.loginfo(f"Observations --->: {obs}") if self.normal_obs: return obs.copy() elif self.rgb_obs: return self.cv_image_rgb.copy() elif self.rgb_plus_normal_obs: return {"rgb_image": self.cv_image_rgb.copy(), "observations": obs.copy()} elif self.rgb_plus_depth_plus_normal_obs: return {"depth_image": self.cv_image_depth.copy(), "rgb_image": self.cv_image_rgb.copy(), "observations": obs.copy()} else: return obs.copy()
[docs] def calculate_reward(self): """ Function to get a reward from the environment. Sparse Reward: float => 1.0 for success, -1.0 for failure Dense Reward: if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal and as always, negative rewards for each step, non-execution and actions not within joint limits Returns: A scalar reward value representing how well the agent is doing in the current episode. """ # - Init reward reward = 0.0 achieved_goal = self.ee_pos desired_goal = self.reach_goal # if it's "Sparse" reward structure if self.reward_arc == "Sparse": # initialise the sparse reward as negative reward = -1.0 # The marker only turns green if reach is done. Otherwise, it is red. self.goal_marker.set_color(r=1.0, g=0.0) self.goal_marker.set_duration(duration=5) # check if robot reached the goal reach_done = self.check_if_reach_done(achieved_goal, desired_goal) if reach_done: reward = 1.0 # done (green) goal_marker self.goal_marker.set_color(r=0.0, g=1.0) # publish the marker to the topic self.goal_marker.publish() # log the reward if self.log_internal_state: rospy.logwarn(">>>REWARD>>>" + str(reward)) # Since we only look for Sparse or Dense, we don't need to check if it's Dense else: # if it's "Dense" reward structure # in case of simple dense reward if self.simple_dense_reward: # - Distance from EE to goal reward dist2goal = scipy.spatial.distance.euclidean(achieved_goal, desired_goal) reward += - dist2goal # for normal dense reward else: # - Check if the EE reached the goal done = self.check_if_reach_done(achieved_goal, desired_goal) if done: # EE reached the goal reward += self.reached_goal_reward # done (green) goal_marker self.goal_marker.set_color(r=0.0, g=1.0) self.goal_marker.set_duration(duration=30) else: # not done (red) goal_marker self.goal_marker.set_color(r=1.0, g=0.0) self.goal_marker.set_duration(duration=5) # - Distance from EE to goal reward dist2goal = scipy.spatial.distance.euclidean(achieved_goal, desired_goal) reward += - self.mult_dist_reward * dist2goal # - Constant step reward reward += self.step_reward # publish the goal marker self.goal_marker.publish() # - Check if actions are in limits reward += self.action_not_in_limits * self.joint_limits_reward # - Check if the action is within the goal space reward += (not self.within_goal_space) * self.not_within_goal_space_reward # to punish for actions where we cannot execute if not self.movement_result: reward += self.none_exe_reward # log the reward if self.log_internal_state: rospy.logwarn(">>>REWARD>>>" + str(reward)) return reward
[docs] def check_if_done(self): """ Function to check if the episode is done. The Task is done if the EE is close enough to the goal Returns: A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered) """ # this is for logging in different colours # Define ANSI escape codes for different colors RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' ENDC = '\033[0m' # --- Init done done = False # - Check if the ee reached the goal done_reach = self.check_if_reach_done(self.ee_pos, self.reach_goal) if done_reach: if self.log_internal_state: rospy.loginfo(GREEN + ">>>>>>>>>>>> Reached the Goal! >>>>>>>>>>>" + ENDC) done = True self.current_action = None # we don't need to execute any more actions self.init_done = False # we don't need to execute the loop until we reset the env self.info_r['is_success'] = True else: self.info_r['is_success'] = False return done
[docs] def check_if_reach_done(self, achieved_goal, desired_goal): """ Check if the reach is done """ done = False # distance between achieved goal and desired goal distance = scipy.spatial.distance.euclidean(achieved_goal, desired_goal) if self.debug: print("distance to the goal:", distance) if distance <= self.reach_tolerance: done = True return done
# not used
[docs] def test_goal_pos(self, goal): """ Function to check if the given goal is reachable """ if self.log_internal_state: rospy.logdebug(f"Goal to check: {str(goal)}") result = self.check_goal(goal) if not result: if self.log_internal_state: rospy.logdebug("The goal is not reachable") return result
# not used
[docs] def get_random_goal(self, max_tries: int = 100): """ Function to get a reachable goal """ for i in range(max_tries): goal = self.goal_space.sample() if self.test_goal_pos(goal): return True, goal if self.log_internal_state: rospy.logdebug("Getting a random goal failed!") return False, None
[docs] def get_random_goal_no_check(self): """ Function to get a random goal without checking """ return True, self.goal_space.sample()
# not used
[docs] def check_action_within_goal_space_fk(self, action): """ Function to check if the given action is within the goal space """ # check if the resulting ee pose is within the goal space - using FK ee_pos = self.calculate_fk(joint_positions=action) if ee_pos is not None: # check if the ee pose is within the goal space - using self.goal_space if self.goal_space.contains(ee_pos): if self.log_internal_state: rospy.logdebug(f"The ee pose of the {action} is within the goal space!") return True else: if self.log_internal_state: rospy.logdebug(f"The ee pose of the {ee_pos} is not within the goal space!") return False if self.log_internal_state: rospy.logwarn("Checking if the action is within the goal space failed!") return False
[docs] def check_action_within_workspace(self, action): """ Function to check if the given action is within the workspace Args: action: The action to be applied to the robot. Returns: A boolean value indicating whether the action is within the workspace """ BLUE = '\033[94m' ENDC = '\033[0m' # check if the resulting ee pose is within the workspace - using FK ee_pos = self.fk_pykdl(action=action) if self.debug: print("goal space", self.workspace_space) # for debugging if ee_pos is not None: # check if the ee pose is within the workspace - using self.workspace_space if self.workspace_space.contains(ee_pos): if self.log_internal_state: rospy.logdebug(f"The ee pose {ee_pos} of the {action} is within the workspace!") if self.debug: print(BLUE + f"The ee pose {ee_pos} of the {action} is within the workspace!" + ENDC) return True else: if self.log_internal_state: rospy.logdebug(f"The ee pose {ee_pos} is not within the workspace!") if self.debug: print(BLUE + f"The ee pose {ee_pos} is not within the workspace!" + ENDC) return False if self.log_internal_state: rospy.logwarn("Checking if the action is within the workspace failed!") if self.debug: print(BLUE + "Checking if the action is within the workspace failed!" + ENDC) return False
[docs] def check_if_z_within_limits(self, action): """ Function to check if the given ee_pos is within the limits """ # get the ee pose from the action using FK ee_pos = self.fk_pykdl(action=action) # The robot is mounted on a table. So, we need to check if the z is within the limits if ee_pos is not None: # check if the z is within the limits if ee_pos[2] > self.lowest_z: if self.log_internal_state: rospy.logdebug(f"The ee pose {ee_pos} of the {action} is within the z limit!") return True else: if self.log_internal_state: rospy.logdebug(f"The ee pose {ee_pos} is not within the z limit!") return False else: if self.log_internal_state: rospy.logwarn("Checking if the action is within the z limit failed!") return False
def _get_params(self): """ Function to get configuration parameters (optional) """ # Action Space self.min_joint_values = rospy.get_param('/ned2/min_joint_pos') self.max_joint_values = rospy.get_param('/ned2/max_joint_pos') # Observation Space self.position_ee_max = rospy.get_param('/ned2/position_ee_max') self.position_ee_min = rospy.get_param('/ned2/position_ee_min') self.rpy_ee_max = rospy.get_param('/ned2/rpy_ee_max') self.rpy_ee_min = rospy.get_param('/ned2/rpy_ee_min') self.linear_distance_max = rospy.get_param('/ned2/linear_distance_max') self.linear_distance_min = rospy.get_param('/ned2/linear_distance_min') self.max_distance = rospy.get_param('/ned2/max_distance') self.min_joint_vel = rospy.get_param('/ned2/min_joint_vel') self.max_joint_vel = rospy.get_param('/ned2/max_joint_vel') self.min_joint_angles = rospy.get_param('/ned2/min_joint_angles') self.max_joint_angles = rospy.get_param('/ned2/max_joint_angles') # Goal space self.position_goal_max = rospy.get_param('/ned2/position_goal_max') self.position_goal_min = rospy.get_param('/ned2/position_goal_min') # Achieved goal self.position_achieved_goal_max = rospy.get_param('/ned2/position_achieved_goal_max') self.position_achieved_goal_min = rospy.get_param('/ned2/position_achieved_goal_min') # Desired goal self.position_desired_goal_max = rospy.get_param('/ned2/position_desired_goal_max') self.position_desired_goal_min = rospy.get_param('/ned2/position_desired_goal_min') # Tolerances self.reach_tolerance = rospy.get_param('/ned2/reach_tolerance') # Variables related to rewards self.step_reward = rospy.get_param('/ned2/step_reward') self.mult_dist_reward = rospy.get_param('/ned2/multiplier_dist_reward') self.reached_goal_reward = rospy.get_param('/ned2/reached_goal_reward') self.joint_limits_reward = rospy.get_param('/ned2/joint_limits_reward') self.none_exe_reward = rospy.get_param('/ned2/none_exe_reward') self.not_within_goal_space_reward = rospy.get_param('/ned2/not_within_goal_space_reward') # workspace self.workspace_max = rospy.get_param('/ned2/workspace_max') self.workspace_min = rospy.get_param('/ned2/workspace_min') # ------------------------------------------------------ # Task Methods for launching gazebo or roscore def _launch_gazebo(self, launch_roscore=True, port=None, paused=False, use_sim_time=True, extra_gazebo_args=None, gui=False, recording=False, debug=False, physics="ode", verbose=False, output='screen', respawn_gazebo=False, pub_clock_frequency=100, server_required=False, gui_required=False, custom_world_path=None, custom_world_pkg=None, custom_world_name=None, launch_new_term=True): """ Launches a new Gazebo simulation with the specified options. Returns: ros_port: None if only launching gazebo and no roscore gazebo_port: None if only launching gazebo and no roscore gazebo_pid: process id for launched gazebo """ ros_port, gazebo_port, gazebo_pid = gazebo_core.launch_gazebo( launch_roscore=launch_roscore, port=port, paused=paused, use_sim_time=use_sim_time, extra_gazebo_args=extra_gazebo_args, gui=gui, recording=recording, debug=debug, physics=physics, verbose=verbose, output=output, respawn_gazebo=respawn_gazebo, pub_clock_frequency=pub_clock_frequency, server_required=server_required, gui_required=gui_required, custom_world_path=custom_world_path, custom_world_pkg=custom_world_pkg, custom_world_name=custom_world_name, launch_new_term=launch_new_term ) return ros_port, gazebo_port, gazebo_pid def _launch_roscore(self, port=None, set_new_master_vars=False): """ Launches a new roscore with the specified port. Only updates the ros_port. Return: ros_port: port of launched roscore """ ros_port, _ = ros_common.launch_roscore(port=int(port), set_new_master_vars=set_new_master_vars) # change to new rosmaster ros_common.change_ros_master(ros_port) return ros_port