Source code for rl_environments.vx300s.real.task_envs.reach.vx300s_reach_real

#!/bin/python3

from typing import Any, Optional, Dict

import rospy
import numpy as np
from gymnasium import spaces
from gymnasium.envs.registration import register
import scipy.spatial

# Custom robot env
from rl_environments.vx300s.real.robot_envs import vx300s_robot_real

# core modules of the framework
from realros.utils import ros_common
from realros.utils import ros_markers

# Register your environment using the OpenAI register method to utilize gym.make("TaskEnv-v0").
# register(
#     id='VX300SReacherReal-v0',
#     entry_point='rl_environments.vx300s.real.task_envs.reach.vx300s_reach_real:VX300SReacherEnv',
#     max_episode_steps=100,
# )


[docs] class VX300SReacherEnv(vx300s_robot_real.VX300SRobotEnv): """ This Task env is for a simple Reach Task with the VX300S robot. The task is done if * The robot reached the goal Here * Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector) * Observation - Continuous (31/28 obs or rgb/depth image or a combination) Init Args: * new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running. * roscore_port: Port of the roscore to be launched. If None, a random port is chosen. * seed: Seed for the random number generator. * close_env_prompt: Whether to prompt the user to close the env or not. * reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware). * reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset). * reward_type: Type of reward to be used. It Can be "Sparse" or "Dense". * delta_action: Whether to use the delta actions or the absolute actions. * delta_coeff: Coefficient to be used for the delta actions. * ee_action_type: Whether to use the end effector action space or the joint action space. * environment_loop_rate: Rate at which the environment loop should run. (in Hz) * action_cycle_time: Time to wait between two consecutive actions. (in seconds) * use_smoothing: Whether to use smoothing for actions or not. * default_port: Whether to use the default port for the roscore or not. * rgb_obs_only: Whether to use only the RGB image as the observations or not. * normal_obs_only: Whether to use only the traditional observations or not. * rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not. * rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not. * debug: Whether to print debug messages or not. * action_speed: set the speed to complete the trajectory. default in 0.5 seconds * simple_dense_reward: Whether to use a simple dense reward or not. * log_internal_state: Whether to log the internal state of the environment or not. * use_kinect: Whether to use the kinect sensor or not. * use_zed2: Whether to use the ZED2 camera or not. """ def __init__(self, new_roscore: bool = False, roscore_port: str = None, seed: int = None, close_env_prompt: bool = True, reset_env_prompt: bool = True, reset_controllers_prompt: bool = False, reward_type: str = "Dense", delta_action: bool = True, delta_coeff: float = 0.05, environment_loop_rate: float = 10.0, action_cycle_time: float = 0.0, use_smoothing: bool = False, default_port=True, ee_action_type: bool = False, rgb_obs_only: bool = False, normal_obs_only: bool = True, rgb_plus_normal_obs: bool = False, rgb_plus_depth_plus_normal_obs: bool = False, debug: bool = False, action_speed: float = 0.5, simple_dense_reward: bool = True, log_internal_state: bool = False, use_kinect: bool = False, use_zed2: bool = False): """ variables to keep track of ros port """ ros_port = None # launch a new roscore with default port if default_port: ros_port = self._launch_roscore(default_port=default_port) # Launch new roscore elif new_roscore: ros_port = self._launch_roscore(port=roscore_port) # ros_port of the already running roscore elif roscore_port is not None: ros_port = roscore_port # change to new rosmaster ros_common.change_ros_master(ros_port) else: if ros_common.is_roscore_running() is False: print("roscore is not running! Launching a new roscore!") ros_port = self._launch_roscore(port=roscore_port) # init the ros node if ros_port is not None: self.node_name = "VX300SReacherEnvReal" + "_" + ros_port else: self.node_name = "VX300SReacherEnvReal" rospy.init_node(self.node_name, anonymous=True) rospy.loginfo(f"Starting {self.node_name}") # When action_cycle_time is 0 the env loop runs without an explicit # per-action sleep (deterministic / fast-step mode), so a strict # period-vs-cycle inequality does not apply. Only enforce the guard # when the caller actually requested a positive action cycle. if action_cycle_time > 0.0 and (1.0 / environment_loop_rate) > action_cycle_time: raise ValueError( f"environment_loop_rate=1/{1.0/environment_loop_rate:.3f}s exceeds " f"action_cycle_time={action_cycle_time:.3f}s; the cached observation " f"could be older than one action cycle. Either lower environment_loop_rate " f"or raise action_cycle_time." ) self.log_internal_state = log_internal_state if reward_type.lower() == "sparse": self.reward_arc = "Sparse" elif reward_type.lower() == "dense": self.reward_arc = "Dense" else: rospy.logwarn(f"The given reward architecture '{reward_type}' not found. Defaulting to Dense!") self.reward_arc = "Dense" # check if we are using simple dense reward # for simple, we only return the distance to the goal (negative Euclidean distance = -d) self.simple_dense_reward = simple_dense_reward self.delta_action = delta_action self.delta_coeff = delta_coeff self.use_smoothing = use_smoothing self.action_cycle_time = action_cycle_time self.action_speed = action_speed self.rgb_obs = rgb_obs_only self.normal_obs = normal_obs_only self.rgb_plus_normal_obs = rgb_plus_normal_obs self.rgb_plus_depth_plus_normal_obs = rgb_plus_depth_plus_normal_obs self.ee_action_type = ee_action_type self.debug = debug # add to ros parameter server ros_common.ros_load_yaml(pkg_name="rl_environments", file_name="vx300s_reach_task_config.yaml", ns="/") self._get_params() # Joint action space or End effector action space # ROS and Gazebo often use double-precision (64-bit), # but we are using single-precision (32-bit) as it is typical for RL implementations. if self.ee_action_type: # EE action space self.max_ee_values = np.array([self.position_ee_max["x"], self.position_ee_max["y"], self.position_ee_max["z"]]) self.min_ee_values = np.array([self.position_ee_min["x"], self.position_ee_min["y"], self.position_ee_min["z"]]) self.action_space = spaces.Box(low=np.array(self.min_ee_values), high=np.array(self.max_ee_values), dtype=np.float32) else: # Joint action space self.action_space = spaces.Box(low=np.array(self.min_joint_values), high=np.array(self.max_joint_values), dtype=np.float32) # ---- ee pos observations_high_ee_pos_range = np.array( np.array([self.position_ee_max["x"], self.position_ee_max["y"], self.position_ee_max["z"]])) observations_low_ee_pos_range = np.array( np.array([self.position_ee_min["x"], self.position_ee_min["y"], self.position_ee_min["z"]])) # ---- vector to the goal - normalized linear distance observations_high_vec_ee_goal = np.array([1.0, 1.0, 1.0]) observations_low_vec_ee_goal = np.array([-1.0, -1.0, -1.0]) # ---- Euclidian distance observations_high_dist = np.array([self.max_distance]) observations_low_dist = np.array([0.0]) # ---- joint values observations_high_joint_values = self.max_joint_angles.copy() observations_low_joint_values = self.min_joint_angles.copy() # ---- previous action if self.ee_action_type: observations_high_prev_action = self.max_ee_values.copy() observations_low_prev_action = self.min_ee_values.copy() else: observations_high_prev_action = self.max_joint_values.copy() observations_low_prev_action = self.min_joint_values.copy() # ---- joint velocities observations_high_joint_vel = self.max_joint_vel.copy() observations_low_joint_vel = self.min_joint_vel.copy() high = np.concatenate( [observations_high_ee_pos_range, observations_high_vec_ee_goal, observations_high_dist, observations_high_joint_values, observations_high_prev_action, observations_high_joint_vel, ]) low = np.concatenate( [observations_low_ee_pos_range, observations_low_vec_ee_goal, observations_low_dist, observations_low_joint_values, observations_low_prev_action, observations_low_joint_vel, ]) # Define the traditional observation space self.observations = spaces.Box(low=low, high=high, dtype=np.float32) # Define the depth image space (480x640 32FC1) - this uses 32-bit float self.depth_image_space = spaces.Box(low=0, high=1, shape=(480, 640), dtype=np.float32) # Define the image space (480x640X3 RGB images) - this uses 8-bit unsigned int self.rgb_image_space = spaces.Box(low=0, high=255, shape=(480, 640, 3), dtype=np.uint8) # Define the final observation space if self.normal_obs: use_kinect = False # to pass to the superclass use_zed2 = False self.observation_space = self.observations elif self.rgb_obs: self.observation_space = self.rgb_image_space elif self.rgb_plus_normal_obs: # Define a combined observation space self.observation_space = spaces.Dict({ "rgb_image": self.rgb_image_space, "observations": self.observations }) elif self.rgb_plus_depth_plus_normal_obs: # Define a combined observation space self.observation_space = spaces.Dict({ "depth_image": self.depth_image_space, "rgb_image": self.rgb_image_space, "observations": self.observations }) # if none of the above, use the traditional observation space else: use_kinect = False use_zed2 = False self.observation_space = self.observations # ---- Goal pos high_goal_pos_range = np.array( np.array([self.position_goal_max["x"], self.position_goal_max["y"], self.position_goal_max["z"]])) low_goal_pos_range = np.array( np.array([self.position_goal_min["x"], self.position_goal_min["y"], self.position_goal_min["z"]])) # -- goal space for sampling self.goal_space = spaces.Box(low=low_goal_pos_range, high=high_goal_pos_range, dtype=np.float32, seed=seed) # ---- Workspace high_workspace_range = np.array( np.array([self.workspace_max["x"], self.workspace_max["y"], self.workspace_max["z"]])) low_workspace_range = np.array( np.array([self.workspace_min["x"], self.workspace_min["y"], self.workspace_min["z"]])) # -- workspace space for checking # we don't need to set the seed here since we're not sampling from this space self.workspace_space = spaces.Box(low=low_workspace_range, high=high_workspace_range, dtype=np.float32) self.goal_marker = ros_markers.RosMarker(frame_id="world", ns="goal", marker_type=2, marker_topic="goal_pos", lifetime=30.0) super().__init__(ros_port=ros_port, seed=seed, close_env_prompt=close_env_prompt, reset_env_prompt=reset_env_prompt, reset_controllers_prompt=reset_controllers_prompt, action_cycle_time=action_cycle_time, use_kinect=use_kinect, use_zed2=use_zed2) # for smoothing if self.use_smoothing: if self.ee_action_type: self.action_vector = np.zeros(3, dtype=np.float32) else: self.action_vector = np.zeros(6, dtype=np.float32) # we can use this to set a time for ros_controllers to complete the action self.environment_loop_time = 1.0 / environment_loop_rate # in seconds self.prev_action = None # for observation if environment_loop_rate is not None: self.obs_r = None self.reward_r = None self.terminated_r = False self.truncated_r = False self.info_r = {} self.current_action = None self.init_done = False # we don't need to execute the loop until we reset the env # Debug if self.debug: self.loop_counter = 0 self.action_counter = 0 # create a timer to run the environment loop rospy.Timer(rospy.Duration(1.0 / environment_loop_rate), self.environment_loop) # for dense reward calculation - for more complex reward calculations self.action_not_in_limits = False self.lowest_z = self.position_goal_min["z"] self.movement_result = False self.within_goal_space = False rospy.loginfo(f"Finished Init of {self.node_name}") # ------------------------------------------------------- # Methods for interacting with the environment def _set_init_params(self, options: Optional[Dict[str, Any]] = None): """ Set initial parameters for the environment. Here we 1. Move the Robot to Home position 2. Find a valid random reach goal """ if self.log_internal_state: rospy.loginfo("Initialising the init params!") # Initial robot pose - Home self.init_pos = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0], dtype=np.float32) # make the current action None to stop execution for real time envs and also stop the env loop self.init_done = False # we don't need to execute the loop until we reset the env self.current_action = None # for smoothing if self.use_smoothing: if self.ee_action_type: self.action_vector = np.zeros(3, dtype=np.float32) else: self.action_vector = np.zeros(6, dtype=np.float32) # move the robot to the home pose # we need to wait for the movement to finish # we define the movement result here so that we can use it in the environment loop (we need it for dense reward) self.move_VX300S_object.stop_arm() self.movement_result = self.move_VX300S_object.set_trajectory_joints(self.init_pos) if not self.movement_result: if self.log_internal_state: rospy.logwarn("Homing failed!") # Get a random Reach goal - np.array # goal_found, goal_vector = self.get_random_goal() # this checks if the goal is reachable using moveit goal_found, goal_vector = self.get_random_goal_no_check() if goal_found: self.reach_goal = goal_vector if self.log_internal_state: rospy.loginfo("Reach Goal--->" + str(self.reach_goal)) else: # fake Reach goal - hard code one self.reach_goal = np.array([0.250, 0.000, 0.015], dtype=np.float32) if self.log_internal_state: rospy.logwarn("Hard Coded Reach Goal--->" + str(self.reach_goal)) # Publish the goal pos self.goal_marker.set_position(position=self.reach_goal) self.goal_marker.publish() # get initial ee pos and joint values (we need this for delta actions or when we have EE action space) ee_pos_tmp = self.get_ee_pose() # Get a geometry_msgs/PoseStamped msg self.ee_pos = np.array([ee_pos_tmp.pose.position.x, ee_pos_tmp.pose.position.y, ee_pos_tmp.pose.position.z]) self.ee_ori = np.array([ee_pos_tmp.pose.orientation.x, ee_pos_tmp.pose.orientation.y, ee_pos_tmp.pose.orientation.z, ee_pos_tmp.pose.orientation.w]) # for IK calculation - EE actions self.joint_values = self.get_joint_angles() # for dense reward calculation self.action_not_in_limits = False self.within_goal_space = True if self.ee_action_type: self.prev_action = self.ee_pos.copy() # for observation else: self.prev_action = self.init_pos.copy() # for observation # We can start the environment loop now if self.log_internal_state: rospy.loginfo("Start resetting the env loop!") # init the real time variables self.obs_r = None self.reward_r = None self.terminated_r = False self.truncated_r = False self.info_r = {} # debug if self.debug: self.loop_counter = 0 self.action_counter = 0 if self.log_internal_state: rospy.loginfo("Done resetting the env loop!") self.init_done = True # self.current_action = self.init_pos.copy() if self.log_internal_state: rospy.loginfo("Initialising init params done--->") def _set_action(self, action): """ Function to apply an action to the robot. Args: action: Joint positions (numpy array) or EE position (numpy array) """ # save the action for observation self.prev_action = action.copy() if self.log_internal_state: rospy.loginfo(f"Applying real-time action---> {action}") self.current_action = action.copy() # for debugging if self.debug: self.action_counter = 0 # reset the action counter def _get_observation(self): """ Function to get an observation from the environment. Returns: An observation representing the current state of the environment. """ obs = None # we cannot copy a None value if self.obs_r is not None: obs = self.obs_r.copy() # incase we don't have an observation yet if obs is None: obs = self.sample_observation() return obs.copy() def _get_reward(self, info: Optional[Dict[str, Any]] = None): """ Function to get a reward from the environment. Returns: A scalar reward value representing how well the agent is doing in the current episode. """ reward = None if self.reward_r is not None: reward = self.reward_r # incase we don't have a reward yet if reward is None: reward = self.calculate_reward() return reward def _compute_terminated(self, info: Optional[Dict[str, Any]] = None): """ Function to check if the episode is done. Returns: A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered) """ terminated = self.terminated_r self.info = self.info_r # we can use this to log the success rate in stable baselines3 and other packages # check if self.info have the is_success key # double-checking here if "is_success" not in self.info: if terminated: self.info["is_success"] = True else: self.info["is_success"] = False # incase we don't have a done yet for real time envs # unnecessary to check since we never set the terminated to None if terminated is None: terminated = self.check_if_done() if terminated: self.info["is_success"] = True # explicitly set the success rate else: self.info["is_success"] = False return terminated def _compute_truncated(self, info: Optional[Dict[str, Any]] = None): """ Function to check if the episode is truncated. Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode. Returns: A boolean value indicating whether the episode has been truncated (e.g. because the maximum number of steps has been reached) """ truncated = self.truncated_r return truncated # ------------------------------------------------------- # Include any custom methods available for the MyTaskEnv class
[docs] def environment_loop(self, event): """ Function for Environment loop for real time RL envs """ # we don't need to execute the loop until we reset the env if self.init_done: # Close-race guard (see reach v3 0712f5a for the diagnosis): # rospy.Timer keeps firing during env.close() while MoveIt # cleanup runs its 1s wait_for_message timeouts. Controllers # get unspawned mid-close → joint_states stops → get_joint_angles # returns []. Next tick's execute_action crashes on the # delta-action broadcast (shape (0,) vs (6,)). Bail out cleanly # if ROS is shutting down or joint state is stale. if rospy.is_shutdown(): return jv = getattr(self, "joint_values", None) if jv is None or len(jv) < 6: return # Joint-state freshness gate (real-side only — sim has # Gazebo's guaranteed clock). The parent robot env stamps # ``_latest_joint_state_time`` on every /joint_states message; # if no message has arrived within joint_state_timeout_s, the # driver / cable is presumed dead. Stop the arm and skip the # rest of this tick — re-executing the cached action against # frozen state would publish stale targets indefinitely. _js_now = rospy.get_time() _js_last = getattr(self, "_latest_joint_state_time", None) _js_threshold = getattr(self, "joint_state_timeout_s", 0.5) if _js_last is None or (_js_now - _js_last) > _js_threshold: rospy.logwarn_throttle( 2.0, f"/joint_states stale (last update " f"{(_js_now - _js_last) if _js_last else 'never'}s ago, " f"threshold {_js_threshold}s). Stopping arm and " "skipping env_loop tick — check the interbotix " "driver + USB cable." ) try: self.move_VX300S_object.stop_arm() except Exception: pass return if self.debug: if self.log_internal_state: rospy.loginfo(f"Starting RL loop --->: {self.loop_counter}") self.loop_counter += 1 # start with the observation, reward, done and info self.info_r = {} self.obs_r = self.sample_observation() self.reward_r = self.calculate_reward() self.terminated_r = self.check_if_done() # Apply the action # we need this if we're done with the task we can break the loop in above done check if self.current_action is not None: self.execute_action(self.current_action) if self.debug: if self.log_internal_state: rospy.loginfo(f"Executing action --->: {self.action_counter}") self.action_counter += 1 else: self.move_VX300S_object.stop_arm() # stop the arm if there is no action
[docs] def execute_action(self, action): """ Function to apply an action to the robot. This method should be implemented here to apply the given action to the robot. The action could be a joint position command, a velocity command, or any other type of command that can be applied to the robot. Args: action: The action to be applied to the robot. """ if self.log_internal_state: rospy.loginfo(f"Action --->: {action}") # --- Set the action based on the action type # --- EE action if self.ee_action_type: # --- Make actions as deltas if self.delta_action: # we can use smoothing using the action_cycle_time or delta_coeff if self.use_smoothing: if self.action_cycle_time == 0.0: # first derivative of the action self.action_vector = self.action_vector + (self.delta_coeff * action) # clip the action vector to be within -1 and 1 self.action_vector = np.clip(self.action_vector, -1, 1) # add the action vector to the current ee pos action = self.ee_pos + (self.action_vector * self.delta_coeff) else: # first derivative of the action self.action_vector = self.action_vector + (self.action_cycle_time * action) # clip the action vector to be within -1 and 1 self.action_vector = np.clip(self.action_vector, -1, 1) # add the action vector to the current ee pos action = self.ee_pos + (self.action_vector * self.action_cycle_time) else: action = self.ee_pos + (action * self.delta_coeff) # check if the action is within the joint limits (for the reward calculation) min_ee_values = np.array(self.min_ee_values) max_ee_values = np.array(self.max_ee_values) self.action_not_in_limits = np.any(action <= (min_ee_values + 0.0001)) or np.any( action >= (max_ee_values - 0.0001)) # clip the action action = np.clip(action, self.min_ee_values, self.max_ee_values) # check if we can reach the goal and within the goal space # check if action is within the workspace if self.workspace_space.contains(action): # calculate IK IK_found, joint_positions = self.calculate_ik(target_pos=action, ee_ori=self.ee_ori) if IK_found: # Per-link FK safety: workspace + IK-feasible doesn't # mean every link stays above the table. The arm can # fold down with elbow/wrist below the surface even # when the EE target sits above. Reject before publishing # to the real driver — on hardware a single bad target # can ram the arm into the table. safe, reason = self._check_action_links_safe( joint_positions, current_joints=self.joint_values ) if not safe: if self.log_internal_state: rospy.logwarn(f"[SAFETY] EE action rejected: {reason}") self.movement_result = False self.within_goal_space = False else: # execute the trajectory - EE self.movement_result = self.move_arm_joints(q_positions=joint_positions, time_from_start=self.action_speed) self.within_goal_space = True else: if self.log_internal_state: rospy.logwarn(f"The action: {action} is not reachable!") rospy.logdebug(f"Set action failed for --->: {action}") self.movement_result = False self.within_goal_space = False else: if self.log_internal_state: rospy.logdebug(f"Set action failed for --->: {action}") self.movement_result = False self.within_goal_space = False # --- Joint action else: # --- Make actions as deltas if self.delta_action: # we can use smoothing using the action_cycle_time or delta_coeff if self.use_smoothing: if self.action_cycle_time == 0.0: # first derivative of the action self.action_vector = self.action_vector + (self.delta_coeff * action) # clip the action vector to be within -1 and 1 self.action_vector = np.clip(self.action_vector, -1, 1) # add the action vector to the current ee pos action = self.joint_values + (self.action_vector * self.delta_coeff) else: # first derivative of the action self.action_vector = self.action_vector + (self.action_cycle_time * action) # clip the action vector to be within -1 and 1 self.action_vector = np.clip(self.action_vector, -1, 1) # add the action vector to the current ee pos action = self.joint_values + (self.action_vector * self.action_cycle_time) else: action = self.joint_values + (action * self.delta_coeff) # check if the action is within the joint limits (for the reward calculation) min_joint_values = np.array(self.min_joint_values) max_joint_values = np.array(self.max_joint_values) self.action_not_in_limits = np.any(action <= (min_joint_values + 0.0001)) or np.any( action >= (max_joint_values - 0.0001)) # clip the action if self.debug: if self.log_internal_state: rospy.logwarn(f"Action + current joint_values before clip --->: {action}") action = np.clip(action, self.min_joint_values, self.max_joint_values) if self.debug: if self.log_internal_state: rospy.logwarn(f"Action + current joint_values after clip --->: {action}") # check if the action is within the workspace if self.check_action_within_workspace(action): # Per-link FK safety: see EE branch above. self.joint_values # is refreshed at env_loop tick, so the delta cap can use it. # On hardware this is the last guard before move_arm_joints # publishes to the interbotix driver. safe, reason = self._check_action_links_safe( action, current_joints=self.joint_values ) if not safe: if self.log_internal_state: rospy.logwarn(f"[SAFETY] joint action rejected: {reason}") self.movement_result = False self.within_goal_space = False else: # execute the trajectory - ros_controllers self.movement_result = self.move_arm_joints(q_positions=action, time_from_start=self.action_speed) self.within_goal_space = True else: if self.log_internal_state: rospy.logdebug(f"Set action failed for --->: {action}") self.movement_result = False self.within_goal_space = False
[docs] def sample_observation(self): """ Function to get an observation from the environment. # traditional observations 01. EE pos - 3 02. Vector to the goal (normalized linear distance) - 3 03. Euclidian distance (ee to reach goal)- 1 04. Current Joint values - 9 05. Previous action - 6 or 3 (joint or ee) 06. Joint velocities - 9 total: (3x2) + 1 + (6 or 3) + (9x2) = 31 or 28 # depth image 480x640 32FC1 # rgb image 480x640X3 RGB images Returns: An observation representing the current state of the environment. """ current_goal = self.reach_goal # --- 1. Get EE position ee = self.fk_pykdl(self.joint_pos_all) if ee is None: ee = self.ee_pos self.ee_pos = np.asarray(ee, dtype=np.float32) # --- Linear distance to the goal linear_dist_ee_goal = current_goal - self.ee_pos # goal is box dtype and ee_pos is numpy.array. It is okay # --- 2. Vector to goal (we are giving only the direction vector) vec_ee_goal = self._safe_unit_vector(linear_dist_ee_goal) # --- 3. Euclidian distance euclidean_distance_ee_goal = scipy.spatial.distance.euclidean(self.ee_pos, current_goal) # float # --- Get Current Joint values - only for the joints we are using # we need this for delta actions # self.joint_values = self.current_joint_positions.copy() # Get a float list self.joint_values = list(self.joint_pos_all) # Get a float list # we don't need to convert this to numpy array since we concat using numpy below if self.prev_action is None: # we can use the ee_pos as the previous action - for EE action type if self.ee_action_type: prev_action = self.ee_pos # we can use the joint values as the previous action - for Joint action type else: prev_action = self.joint_values.copy() else: prev_action = self.prev_action.copy() # our observations obs = np.concatenate((self.ee_pos, vec_ee_goal, euclidean_distance_ee_goal, self.joint_pos_all, prev_action, self.current_joint_velocities), axis=None, dtype=np.float32) if self.log_internal_state: rospy.loginfo(f"Observations --->: {obs}") if self.normal_obs: return obs.copy() elif self.rgb_obs: return self.cv_image_rgb.copy() elif self.rgb_plus_normal_obs: return {"rgb_image": self.cv_image_rgb.copy(), "observations": obs.copy()} elif self.rgb_plus_depth_plus_normal_obs: return {"depth_image": self.cv_image_depth.copy(), "rgb_image": self.cv_image_rgb.copy(), "observations": obs.copy()} else: return obs.copy()
[docs] def calculate_reward(self): """ Function to get a reward from the environment. Sparse Reward: float => 1.0 for success, -1.0 for failure Dense Reward: if reached: self.reached_goal_reward (positive reward) else: - self.mult_dist_reward * distance_to_the_goal and as always, negative rewards for each step, non-execution and actions not within joint limits Returns: A scalar reward value representing how well the agent is doing in the current episode. """ # - Init reward reward = 0.0 achieved_goal = self.ee_pos desired_goal = self.reach_goal # if it's "Sparse" reward structure if self.reward_arc == "Sparse": # initialise the sparse reward as negative reward = -1.0 # The marker only turns green if reach is done. Otherwise, it is red. self.goal_marker.set_color(r=1.0, g=0.0) self.goal_marker.set_duration(duration=5) # check if robot reached the goal reach_done = self.check_if_reach_done(achieved_goal, desired_goal) if reach_done: reward = 1.0 # done (green) goal_marker self.goal_marker.set_color(r=0.0, g=1.0) # publish the marker to the topic self.goal_marker.publish() # log the reward if self.log_internal_state: rospy.logwarn(">>>REWARD>>>" + str(reward)) # Since we only look for Sparse or Dense, we don't need to check if it's Dense else: # if it's "Dense" reward structure # in case of simple dense reward if self.simple_dense_reward: # - Distance from EE to goal reward dist2goal = scipy.spatial.distance.euclidean(achieved_goal, desired_goal) reward += - dist2goal # for normal dense reward else: # - Check if the EE reached the goal done = self.check_if_reach_done(achieved_goal, desired_goal) if done: # EE reached the goal reward += self.reached_goal_reward # done (green) goal_marker self.goal_marker.set_color(r=0.0, g=1.0) self.goal_marker.set_duration(duration=5) else: # not done (red) goal_marker self.goal_marker.set_color(r=1.0, g=0.0) self.goal_marker.set_duration(duration=5) # - Distance from EE to goal reward dist2goal = scipy.spatial.distance.euclidean(achieved_goal, desired_goal) reward += - self.mult_dist_reward * dist2goal # - Constant step reward reward += self.step_reward # publish the goal marker self.goal_marker.publish() # - Check if actions are in limits reward += self.action_not_in_limits * self.joint_limits_reward # - Check if the action is within the goal space reward += (not self.within_goal_space) * self.not_within_goal_space_reward # to punish for actions where we cannot execute if not self.movement_result: reward += self.none_exe_reward # log the reward if self.log_internal_state: rospy.logwarn(">>>REWARD>>>" + str(reward)) return reward
[docs] def check_if_done(self): """ Function to check if the episode is done. The Task is done if the EE is close enough to the goal Returns: A boolean value indicating whether the episode has ended (e.g., because a goal has been reached or a failure condition has been triggered) """ # this is for logging in different colours # Define ANSI escape codes for different colors RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' ENDC = '\033[0m' # --- Init done done = False # - Check if the ee reached the goal done_reach = self.check_if_reach_done(self.ee_pos, self.reach_goal) if done_reach: if self.log_internal_state: rospy.loginfo(GREEN + ">>>>>>>>>>>> Reached the Goal! >>>>>>>>>>>" + ENDC) done = True self.current_action = None # we don't need to execute any more actions self.init_done = False # we don't need to execute the loop until we reset the env self.info_r['is_success'] = True else: self.info_r['is_success'] = False return done
[docs] def check_if_reach_done(self, achieved_goal, desired_goal): """ Check if the reach is done """ done = False # distance between achieved goal and desired goal distance = scipy.spatial.distance.euclidean(achieved_goal, desired_goal) if self.debug: print("distance to the goal:", distance) if distance <= self.reach_tolerance: done = True return done
[docs] def test_goal_pos(self, goal): """ Function to check if the given goal is reachable """ if self.log_internal_state: rospy.logdebug(f"Goal to check: {str(goal)}") result = self.check_goal(goal) if not result: if self.log_internal_state: rospy.logdebug("The goal is not reachable") return result
# not used
[docs] def get_random_goal(self, max_tries: int = 100): """ Function to get a reachable goal """ for i in range(max_tries): goal = self._sample_box(self.goal_space) if self.test_goal_pos(goal): return True, goal if self.log_internal_state: rospy.logdebug("Getting a random goal failed!") return False, None
[docs] def get_random_goal_no_check(self): """ Function to get a random goal without checking """ return True, self._sample_box(self.goal_space)
# not used
[docs] def check_action_within_goal_space_fk(self, action): """ Function to check if the given action is within the goal space """ # check if the resulting ee pose is within the goal space - using FK ee_pos = self.calculate_fk(joint_positions=action) if ee_pos is not None: # check if the ee pose is within the goal space - using self.goal_space if self.goal_space.contains(ee_pos): if self.log_internal_state: rospy.logdebug(f"The ee pose of the {action} is within the goal space!") return True else: if self.log_internal_state: rospy.logdebug(f"The ee pose of the {ee_pos} is not within the goal space!") return False if self.log_internal_state: rospy.logwarn("Checking if the action is within the goal space failed!") return False
[docs] def check_action_within_workspace(self, action): """ Function to check if the given action is within the workspace Args: action: The action to be applied to the robot. Returns: A boolean value indicating whether the action is within the workspace """ BLUE = '\033[94m' ENDC = '\033[0m' # check if the resulting ee pose is within the workspace - using FK ee_pos = self.fk_pykdl(action=action) if self.debug: print("goal space", self.workspace_space) # for debugging if ee_pos is not None: # check if the ee pose is within the workspace - using self.workspace_space if self.workspace_space.contains(ee_pos): if self.log_internal_state: rospy.logdebug(f"The ee pose {ee_pos} of the {action} is within the workspace!") if self.debug: print(BLUE + f"The ee pose {ee_pos} of the {action} is within the workspace!" + ENDC) return True else: if self.log_internal_state: rospy.logdebug(f"The ee pose {ee_pos} is not within the workspace!") if self.debug: print(BLUE + f"The ee pose {ee_pos} is not within the workspace!" + ENDC) return False if self.log_internal_state: rospy.logwarn("Checking if the action is within the workspace failed!") if self.debug: print(BLUE + "Checking if the action is within the workspace failed!" + ENDC) return False
[docs] def check_if_z_within_limits(self, action): """ Function to check if the given ee_pos is within the limits """ # get the ee pose from the action using FK ee_pos = self.fk_pykdl(action=action) # The robot is mounted on a table. So, we need to check if the z is within the limits if ee_pos is not None: # check if the z is within the limits if ee_pos[2] > self.lowest_z: if self.log_internal_state: rospy.logdebug(f"The ee pose {ee_pos} of the {action} is within the z limit!") return True else: if self.log_internal_state: rospy.logdebug(f"The ee pose {ee_pos} is not within the z limit!") return False else: if self.log_internal_state: rospy.logwarn("Checking if the action is within the z limit failed!") return False
def _get_params(self): """ Function to get configuration parameters (optional) """ # Action Space self.min_joint_values = rospy.get_param('/vx300s/min_joint_pos') self.max_joint_values = rospy.get_param('/vx300s/max_joint_pos') # Observation Space self.position_ee_max = rospy.get_param('/vx300s/position_ee_max') self.position_ee_min = rospy.get_param('/vx300s/position_ee_min') self.rpy_ee_max = rospy.get_param('/vx300s/rpy_ee_max') self.rpy_ee_min = rospy.get_param('/vx300s/rpy_ee_min') self.linear_distance_max = rospy.get_param('/vx300s/linear_distance_max') self.linear_distance_min = rospy.get_param('/vx300s/linear_distance_min') self.max_distance = rospy.get_param('/vx300s/max_distance') self.min_joint_vel = rospy.get_param('/vx300s/min_joint_vel') self.max_joint_vel = rospy.get_param('/vx300s/max_joint_vel') self.min_joint_angles = rospy.get_param('/vx300s/min_joint_angles') self.max_joint_angles = rospy.get_param('/vx300s/max_joint_angles') # Goal space self.position_goal_max = rospy.get_param('/vx300s/position_goal_max') self.position_goal_min = rospy.get_param('/vx300s/position_goal_min') # Achieved goal self.position_achieved_goal_max = rospy.get_param('/vx300s/position_achieved_goal_max') self.position_achieved_goal_min = rospy.get_param('/vx300s/position_achieved_goal_min') # Desired goal self.position_desired_goal_max = rospy.get_param('/vx300s/position_desired_goal_max') self.position_desired_goal_min = rospy.get_param('/vx300s/position_desired_goal_min') # Tolerances self.reach_tolerance = rospy.get_param('/vx300s/reach_tolerance') # Real-side env_loop freshness threshold (seconds). If # /joint_states hasn't published within this window, env_loop # stops the arm and bails out of the current tick. self.joint_state_timeout_s = float( rospy.get_param('/vx300s/joint_state_timeout_s', 0.5) ) # Variables related to rewards self.step_reward = rospy.get_param('/vx300s/step_reward') self.mult_dist_reward = rospy.get_param('/vx300s/multiplier_dist_reward') self.reached_goal_reward = rospy.get_param('/vx300s/reached_goal_reward') self.joint_limits_reward = rospy.get_param('/vx300s/joint_limits_reward') self.none_exe_reward = rospy.get_param('/vx300s/none_exe_reward') self.not_within_goal_space_reward = rospy.get_param('/vx300s/not_within_goal_space_reward') # workspace self.workspace_max = rospy.get_param('/vx300s/workspace_max') self.workspace_min = rospy.get_param('/vx300s/workspace_min') # ------------------------------------------------------ # Task Methods for launching roscore @staticmethod def _launch_roscore(port=None, set_new_master_vars=False, default_port=False): """ Launches a new roscore with the specified port. Only updates the ros_port. Return: ros_port: port of launched roscore """ if port is None: port = 11311 # default port ros_port = ros_common.launch_roscore(port=port, set_new_master_vars=set_new_master_vars, default_port=default_port) # change to new rosmaster ros_common.change_ros_master(ros_port) return ros_port