#!/bin/python3
from typing import Any, Optional, Dict
import rospy
import numpy as np
from gymnasium import spaces
from gymnasium.envs.registration import register
import scipy.spatial
# Custom robot env
from rl_environments.ned2.real.robot_envs import ned2_robot_real
# core modules of the framework
from realros.utils import ros_common
from realros.utils import ros_markers
# Register your environment using the OpenAI register method to utilize gym.make("TaskEnv-v0").
# register(
# id='NED2ReacherReal-v0',
# entry_point='rl_environments.ned2.real.task_envs.reach.ned2_reach_real:NED2ReacherEnv',
# max_episode_steps=100,
# )
[docs]
class NED2ReacherEnv(ned2_robot_real.NED2RobotEnv):
"""
This Task env is for a simple Reach Task with the NED2 robot.
The task is done if
* The robot reached the goal
Here
* Action Space - Continuous (6 actions for joints or 3 xyz position of the end effector)
* Observation - Continuous (simp obs or rgb/depth image or a combination)
Init Args:
* new_roscore: Whether to launch a new roscore or not. If False, it is assumed that a roscore is already running.
* roscore_port: Port of the roscore to be launched. If None, a random port is chosen.
* seed: Seed for the random number generator.
* close_env_prompt: Whether to prompt the user to close the env or not.
* reset_env_prompt: Whether to prompt the operator before each reset (defaults to True because reset homes the arm via MoveIt on real hardware).
* reset_controllers_prompt: Whether to prompt before re-spawning controllers on reset (defaults to False because this env does not respawn controllers on reset).
* reward_type: Type of reward to be used. It Can be "Sparse" or "Dense".
* delta_action: Whether to use the delta actions or the absolute actions.
* delta_coeff: Coefficient to be used for the delta actions.
* ee_action_type: Whether to use the end effector action space or the joint action space.
* environment_loop_rate: Rate at which the environment loop should run. (in Hz)
* action_cycle_time: Time to wait between two consecutive actions. (in seconds)
* use_smoothing: Whether to use smoothing for actions or not.
* default_port: Whether to use the default port for the roscore or not.
* rgb_obs_only: Whether to use only the RGB image as the observations or not.
* normal_obs_only: Whether to use only the traditional observations or not.
* rgb_plus_normal_obs: Whether to use RGB image and traditional observations or not.
* rgb_plus_depth_plus_normal_obs: Whether to use RGB image, Depth image and traditional observations or not.
* debug: Whether to print debug messages or not.
* action_speed: set the speed to complete the trajectory. default in 0.5 seconds
* simple_dense_reward: Whether to use a simple dense reward or not.
* log_internal_state: Whether to log the internal state of the environment or not.
* use_kinect: Whether to use the kinect sensor or not.
* use_zed2: Whether to use the ZED2 camera or not.
* remote_ip: IP address of the remote machine where the ROS master is running.
* local_ip: IP address of the local machine where the ROS node is running.
* multi_device_mode: When True (and remote_ip + local_ip + roscore_port are set) the env attaches to a roscore already running on the remote (e.g. the Niryo Raspberry Pi master). Defaults to False, so by default the env launches its own roscore on the local machine via default_port/new_roscore — set this True when running the published Pi multi-device topology.
"""
def __init__(self, new_roscore: bool = False, roscore_port: str = None, seed: int = None,
close_env_prompt: bool = True, reset_env_prompt: bool = True,
reset_controllers_prompt: bool = False, reward_type: str = "Dense",
delta_action: bool = True, delta_coeff: float = 0.05,
environment_loop_rate: float = 25.0, action_cycle_time: float = 0.0,
use_smoothing: bool = False, default_port=True, ee_action_type: bool = False,
rgb_obs_only: bool = False, normal_obs_only: bool = True, rgb_plus_normal_obs: bool = False,
rgb_plus_depth_plus_normal_obs: bool = False, debug: bool = False, action_speed: float = 0.5,
simple_dense_reward: bool = True, log_internal_state: bool = False, use_kinect: bool = False,
use_zed2: bool = False, use_wrist_camera: bool = False, remote_ip: str = None, local_ip:str = None, multi_device_mode: bool = False,
):
"""
variables to keep track of ros port
"""
ros_port = roscore_port
if multi_device_mode:
if remote_ip is not None and local_ip is not None and ros_port is not None:
ros_common.change_ros_master_multi_device(remote_ip=remote_ip,
local_ip=local_ip, remote_ros_port=ros_port)
else:
rospy.logerr("Remote IP and Local IP must be provided for multi-device mode.")
# launch a new roscore with default port
elif default_port:
ros_port = self._launch_roscore(default_port=default_port)
# Launch new roscore
elif new_roscore:
ros_port = self._launch_roscore(port=roscore_port)
# ros_port of the already running roscore
elif roscore_port is not None:
ros_port = roscore_port
# change to new rosmaster
ros_common.change_ros_master(ros_port)
else:
if ros_common.is_roscore_running() is False:
print("roscore is not running! Launching a new roscore!")
ros_port = self._launch_roscore(port=roscore_port)
# init the ros node
if ros_port is not None:
self.node_name = "NED2ReacherEnvReal" + "_" + ros_port
else:
self.node_name = "NED2ReacherEnvReal"
rospy.init_node(self.node_name, anonymous=True)
rospy.loginfo(f"Starting {self.node_name}")
# When action_cycle_time is 0 the env loop runs without an explicit
# per-action sleep (deterministic / fast-step mode), so a strict
# period-vs-cycle inequality does not apply. Only enforce the guard
# when the caller actually requested a positive action cycle.
if action_cycle_time > 0.0 and (1.0 / environment_loop_rate) > action_cycle_time:
raise ValueError(
f"environment_loop_rate=1/{1.0/environment_loop_rate:.3f}s exceeds "
f"action_cycle_time={action_cycle_time:.3f}s; the cached observation "
f"could be older than one action cycle. Either lower environment_loop_rate "
f"or raise action_cycle_time."
)
self.log_internal_state = log_internal_state
if reward_type.lower() == "sparse":
self.reward_arc = "Sparse"
elif reward_type.lower() == "dense":
self.reward_arc = "Dense"
else:
rospy.logwarn(f"The given reward architecture '{reward_type}' not found. Defaulting to Dense!")
self.reward_arc = "Dense"
# check if we are using simple dense reward
# for simple, we only return the distance to the goal (negative Euclidean distance = -d)
self.simple_dense_reward = simple_dense_reward
self.delta_action = delta_action
self.delta_coeff = delta_coeff
self.use_smoothing = use_smoothing
self.action_cycle_time = action_cycle_time
self.action_speed = action_speed
self.rgb_obs = rgb_obs_only
self.normal_obs = normal_obs_only
self.rgb_plus_normal_obs = rgb_plus_normal_obs
self.rgb_plus_depth_plus_normal_obs = rgb_plus_depth_plus_normal_obs
self.ee_action_type = ee_action_type
self.debug = debug
# add to ros parameter server
ros_common.ros_load_yaml(pkg_name="rl_environments", file_name="ned2_reach_task_config.yaml", ns="/")
self._get_params()
# Joint action space or End effector action space
# ROS and Gazebo often use double-precision (64-bit),
# but we are using single-precision (32-bit) as it is typical for RL implementations.
if self.ee_action_type:
# EE action space
self.max_ee_values = np.array([self.position_ee_max["x"], self.position_ee_max["y"],
self.position_ee_max["z"]])
self.min_ee_values = np.array([self.position_ee_min["x"], self.position_ee_min["y"],
self.position_ee_min["z"]])
self.action_space = spaces.Box(low=np.array(self.min_ee_values), high=np.array(self.max_ee_values),
dtype=np.float32)
else:
# Joint action space
self.action_space = spaces.Box(low=np.array(self.min_joint_values), high=np.array(self.max_joint_values),
dtype=np.float32)
# ---- ee pos
observations_high_ee_pos_range = np.array(
np.array([self.position_ee_max["x"], self.position_ee_max["y"], self.position_ee_max["z"]]))
observations_low_ee_pos_range = np.array(
np.array([self.position_ee_min["x"], self.position_ee_min["y"], self.position_ee_min["z"]]))
# ---- vector to the goal - normalized linear distance
observations_high_vec_ee_goal = np.array([1.0, 1.0, 1.0])
observations_low_vec_ee_goal = np.array([-1.0, -1.0, -1.0])
# ---- Euclidian distance
observations_high_dist = np.array([self.max_distance])
observations_low_dist = np.array([0.0])
# ---- joint values
observations_high_joint_values = self.max_joint_angles.copy()
observations_low_joint_values = self.min_joint_angles.copy()
# ---- previous action
if self.ee_action_type:
observations_high_prev_action = self.max_ee_values.copy()
observations_low_prev_action = self.min_ee_values.copy()
else:
observations_high_prev_action = self.max_joint_values.copy()
observations_low_prev_action = self.min_joint_values.copy()
# ---- joint velocities
observations_high_joint_vel = self.max_joint_vel.copy()
observations_low_joint_vel = self.min_joint_vel.copy()
high = np.concatenate(
[observations_high_ee_pos_range, observations_high_vec_ee_goal, observations_high_dist,
observations_high_joint_values, observations_high_prev_action, observations_high_joint_vel, ])
low = np.concatenate(
[observations_low_ee_pos_range, observations_low_vec_ee_goal, observations_low_dist,
observations_low_joint_values, observations_low_prev_action, observations_low_joint_vel, ])
# Define the traditional observation space
self.observations = spaces.Box(low=low, high=high, dtype=np.float32)
# Define the depth image space (480x640 32FC1) - this uses 32-bit float
self.depth_image_space = spaces.Box(low=0, high=1, shape=(480, 640), dtype=np.float32)
# Define the image space (480x640X3 RGB images) - this uses 8-bit unsigned int
self.rgb_image_space = spaces.Box(low=0, high=255, shape=(480, 640, 3), dtype=np.uint8)
# Define the final observation space
if self.normal_obs:
use_kinect = False # to pass to the superclass
use_zed2 = False
self.observation_space = self.observations
elif self.rgb_obs:
self.observation_space = self.rgb_image_space
elif self.rgb_plus_normal_obs:
# Define a combined observation space
self.observation_space = spaces.Dict({
"rgb_image": self.rgb_image_space,
"observations": self.observations
})
elif self.rgb_plus_depth_plus_normal_obs:
# Define a combined observation space
self.observation_space = spaces.Dict({
"depth_image": self.depth_image_space,
"rgb_image": self.rgb_image_space,
"observations": self.observations
})
# if none of the above, use the traditional observation space
else:
use_kinect = False
use_zed2 = False
self.observation_space = self.observations
# ---- Goal pos
high_goal_pos_range = np.array(
np.array([self.position_goal_max["x"], self.position_goal_max["y"], self.position_goal_max["z"]]))
low_goal_pos_range = np.array(
np.array([self.position_goal_min["x"], self.position_goal_min["y"], self.position_goal_min["z"]]))
# -- goal space for sampling
self.goal_space = spaces.Box(low=low_goal_pos_range, high=high_goal_pos_range, dtype=np.float32,
seed=seed)
# ---- Workspace
high_workspace_range = np.array(
np.array([self.workspace_max["x"], self.workspace_max["y"], self.workspace_max["z"]]))
low_workspace_range = np.array(
np.array([self.workspace_min["x"], self.workspace_min["y"], self.workspace_min["z"]]))
# -- workspace space for checking
# we don't need to set the seed here since we're not sampling from this space
self.workspace_space = spaces.Box(low=low_workspace_range, high=high_workspace_range, dtype=np.float32)
self.goal_marker = ros_markers.RosMarker(frame_id="world", ns="goal", marker_type=2, marker_topic="goal_pos",
lifetime=30.0)
super().__init__(ros_port=ros_port, seed=seed, close_env_prompt=close_env_prompt,
reset_env_prompt=reset_env_prompt,
reset_controllers_prompt=reset_controllers_prompt,
action_cycle_time=action_cycle_time, use_kinect=use_kinect, use_zed2=use_zed2, use_wrist_camera=use_wrist_camera,
remote_ip=remote_ip, local_ip=local_ip, multi_device_mode=multi_device_mode)
# for smoothing
if self.use_smoothing:
if self.ee_action_type:
self.action_vector = np.zeros(3, dtype=np.float32)
else:
self.action_vector = np.zeros(6, dtype=np.float32)
# we can use this to set a time for ros_controllers to complete the action
self.environment_loop_time = 1.0 / environment_loop_rate # in seconds
self.prev_action = None # for observation
if environment_loop_rate is not None:
self.obs_r = None
self.reward_r = None
self.terminated_r = False
self.truncated_r = False
self.info_r = {}
self.current_action = None
self.init_done = False # we don't need to execute the loop until we reset the env
# Debug
if self.debug:
self.loop_counter = 0
self.action_counter = 0
# create a timer to run the environment loop
rospy.Timer(rospy.Duration(1.0 / environment_loop_rate), self.environment_loop)
# for dense reward calculation - for more complex reward calculations
self.action_not_in_limits = False
self.lowest_z = self.position_goal_min["z"]
self.movement_result = False
self.within_goal_space = False
rospy.loginfo(f"Finished Init of {self.node_name}")
# -------------------------------------------------------
# Methods for interacting with the environment
def _set_init_params(self, options: Optional[Dict[str, Any]] = None):
"""
Set initial parameters for the environment.
Here we
1. Move the Robot to Home position
2. Find a valid random reach goal
"""
if self.log_internal_state:
rospy.loginfo("Initialising the init params!")
# Initial robot pose - Home
self.init_pos = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0], dtype=np.float32)
# make the current action None to stop execution for real time envs and also stop the env loop
self.init_done = False # we don't need to execute the loop until we reset the env
self.current_action = None
# for smoothing
if self.use_smoothing:
if self.ee_action_type:
self.action_vector = np.zeros(3, dtype=np.float32)
else:
self.action_vector = np.zeros(6, dtype=np.float32)
# move the robot to the home pose
# we need to wait for the movement to finish
# we define the movement result here so that we can use it in the environment loop (we need it for dense reward)
self.move_NED2_object.stop_arm()
self.movement_result = self.move_NED2_object.set_trajectory_joints(self.init_pos)
if not self.movement_result:
if self.log_internal_state:
rospy.logwarn("Homing failed!")
# Get a random Reach goal - np.array
# goal_found, goal_vector = self.get_random_goal() # this checks if the goal is reachable using moveit
goal_found, goal_vector = self.get_random_goal_no_check()
if goal_found:
self.reach_goal = goal_vector
if self.log_internal_state:
rospy.loginfo("Reach Goal--->" + str(self.reach_goal))
else:
# fake Reach goal - hard code one
self.reach_goal = np.array([0.250, 0.000, 0.015], dtype=np.float32)
if self.log_internal_state:
rospy.logwarn("Hard Coded Reach Goal--->" + str(self.reach_goal))
# Publish the goal pos
self.goal_marker.set_position(position=self.reach_goal)
self.goal_marker.publish()
# get initial ee pos and joint values (we need this for delta actions or when we have EE action space)
ee_pos_tmp = self.get_ee_pose() # Get a geometry_msgs/PoseStamped msg
self.ee_pos = np.array([ee_pos_tmp.pose.position.x, ee_pos_tmp.pose.position.y, ee_pos_tmp.pose.position.z])
self.ee_ori = np.array([ee_pos_tmp.pose.orientation.x, ee_pos_tmp.pose.orientation.y,
ee_pos_tmp.pose.orientation.z,
ee_pos_tmp.pose.orientation.w]) # for IK calculation - EE actions
self.joint_values = self.get_joint_angles()
# for dense reward calculation
self.action_not_in_limits = False
self.within_goal_space = True
if self.ee_action_type:
self.prev_action = self.ee_pos.copy() # for observation
else:
self.prev_action = self.init_pos.copy() # for observation
# We can start the environment loop now
if self.log_internal_state:
rospy.loginfo("Start resetting the env loop!")
# init the real time variables
self.obs_r = None
self.reward_r = None
self.terminated_r = False
self.truncated_r = False
self.info_r = {}
# debug
if self.debug:
self.loop_counter = 0
self.action_counter = 0
if self.log_internal_state:
rospy.loginfo("Done resetting the env loop!")
self.init_done = True
# self.current_action = self.init_pos.copy()
if self.log_internal_state:
rospy.loginfo("Initialising init params done--->")
def _set_action(self, action):
"""
Function to apply an action to the robot.
Args:
action: Joint positions (numpy array) or EE position (numpy array)
"""
# save the action for observation
self.prev_action = action.copy()
if self.log_internal_state:
rospy.loginfo(f"Applying real-time action---> {action}")
self.current_action = action.copy()
# for debugging
if self.debug:
self.action_counter = 0 # reset the action counter
def _get_observation(self):
"""
Function to get an observation from the environment.
Returns:
An observation representing the current state of the environment.
"""
obs = None
# we cannot copy a None value
if self.obs_r is not None:
obs = self.obs_r.copy()
# incase we don't have an observation yet
if obs is None:
obs = self.sample_observation()
return obs.copy()
def _get_reward(self, info: Optional[Dict[str, Any]] = None):
"""
Function to get a reward from the environment.
Returns:
A scalar reward value representing how well the agent is doing in the current episode.
"""
reward = None
if self.reward_r is not None:
reward = self.reward_r
# incase we don't have a reward yet
if reward is None:
reward = self.calculate_reward()
return reward
def _compute_terminated(self, info: Optional[Dict[str, Any]] = None):
"""
Function to check if the episode is done.
Returns:
A boolean value indicating whether the episode has ended
(e.g., because a goal has been reached or a failure condition has been triggered)
"""
terminated = self.terminated_r
self.info = self.info_r # we can use this to log the success rate in stable baselines3 and other packages
# check if self.info have the is_success key
# double-checking here
if "is_success" not in self.info:
if terminated:
self.info["is_success"] = True
else:
self.info["is_success"] = False
# incase we don't have a done yet for real time envs
# unnecessary to check since we never set the terminated to None
if terminated is None:
terminated = self.check_if_done()
if terminated:
self.info["is_success"] = True # explicitly set the success rate
else:
self.info["is_success"] = False
return terminated
def _compute_truncated(self, info: Optional[Dict[str, Any]] = None):
"""
Function to check if the episode is truncated.
Mainly hard coded here since we are using a wrapper that sets the max number of steps and truncates the episode.
Returns:
A boolean value indicating whether the episode has been truncated
(e.g. because the maximum number of steps has been reached)
"""
truncated = self.truncated_r
return truncated
# -------------------------------------------------------
# Include any custom methods available for the MyTaskEnv class
[docs]
def environment_loop(self, event):
"""
Function for Environment loop for real time RL envs
"""
# we don't need to execute the loop until we reset the env
if self.init_done:
# Close-race guard: rospy.Timer keeps firing during env.close()
# while MoveIt cleanup runs its 1s wait_for_message timeouts.
# Controllers get unspawned mid-close → joint_states stops →
# get_joint_angles returns []. Next tick's execute_action
# crashes on the delta-action broadcast (shape (0,) vs (6,)).
# Bail out cleanly if ROS is shutting down or joint state is
# stale.
if rospy.is_shutdown():
return
jv = getattr(self, "joint_values", None)
if jv is None or len(jv) < 6:
return
# Joint-state freshness gate (real-side only — sim has
# Gazebo's guaranteed clock). The parent robot env stamps
# ``_latest_joint_state_time`` on every /joint_states message;
# if no message has arrived within joint_state_timeout_s, the
# driver / cable is presumed dead. Stop the arm and skip the
# rest of this tick — re-executing the cached action against
# frozen state would publish stale targets indefinitely.
_js_now = rospy.get_time()
_js_last = getattr(self, "_latest_joint_state_time", None)
_js_threshold = getattr(self, "joint_state_timeout_s", 0.5)
if _js_last is None or (_js_now - _js_last) > _js_threshold:
rospy.logwarn_throttle(
2.0,
f"/joint_states stale (last update "
f"{(_js_now - _js_last) if _js_last else 'never'}s ago, "
f"threshold {_js_threshold}s). Stopping arm and "
"skipping env_loop tick — check the niryo driver."
)
try:
self.move_NED2_object.stop_arm()
except Exception:
pass
return
if self.debug:
if self.log_internal_state:
rospy.loginfo(f"Starting RL loop --->: {self.loop_counter}")
self.loop_counter += 1
# start with the observation, reward, done and info
self.info_r = {}
self.obs_r = self.sample_observation()
self.reward_r = self.calculate_reward()
self.terminated_r = self.check_if_done()
# Apply the action
# we need this if we're done with the task we can break the loop in above done check
if self.current_action is not None:
self.execute_action(self.current_action)
if self.debug:
if self.log_internal_state:
rospy.loginfo(f"Executing action --->: {self.action_counter}")
self.action_counter += 1
else:
self.move_NED2_object.stop_arm() # stop the arm if there is no action
[docs]
def execute_action(self, action):
"""
Function to apply an action to the robot.
This method should be implemented here to apply the given action to the robot. The action could be a
joint position command, a velocity command, or any other type of command that can be applied to the robot.
Args:
action: The action to be applied to the robot.
"""
if self.log_internal_state:
rospy.loginfo(f"Action --->: {action}")
# --- Set the action based on the action type
# --- EE action
if self.ee_action_type:
# --- Make actions as deltas
if self.delta_action:
# we can use smoothing using the action_cycle_time or delta_coeff
if self.use_smoothing:
if self.action_cycle_time == 0.0:
# first derivative of the action
self.action_vector = self.action_vector + (self.delta_coeff * action)
# clip the action vector to be within -1 and 1
self.action_vector = np.clip(self.action_vector, -1, 1)
# add the action vector to the current ee pos
action = self.ee_pos + (self.action_vector * self.delta_coeff)
else:
# first derivative of the action
self.action_vector = self.action_vector + (self.action_cycle_time * action)
# clip the action vector to be within -1 and 1
self.action_vector = np.clip(self.action_vector, -1, 1)
# add the action vector to the current ee pos
action = self.ee_pos + (self.action_vector * self.action_cycle_time)
else:
action = self.ee_pos + (action * self.delta_coeff)
# check if the action is within the joint limits (for the reward calculation)
min_ee_values = np.array(self.min_ee_values)
max_ee_values = np.array(self.max_ee_values)
self.action_not_in_limits = np.any(action <= (min_ee_values + 0.0001)) or np.any(
action >= (max_ee_values - 0.0001))
# clip the action
action = np.clip(action, self.min_ee_values, self.max_ee_values)
# check if we can reach the goal and within the goal space
# check if action is within the workspace
if self.workspace_space.contains(action):
# calculate IK
IK_found, joint_positions = self.calculate_ik(target_pos=action, ee_ori=self.ee_ori)
if IK_found:
# Per-link FK safety: workspace + IK-feasible doesn't
# mean every link stays above the table/ground. The
# arm can fold down with elbow/wrist below the
# surface even when the EE target sits above. Reject
# before publishing to the niryo driver — on hardware
# a single bad target can ram the arm into the table.
safe, reason = self._check_action_links_safe(
joint_positions, current_joints=self.joint_values
)
if not safe:
if self.log_internal_state:
rospy.logwarn(f"[SAFETY] EE action rejected: {reason}")
self.movement_result = False
self.within_goal_space = False
else:
# execute the trajectory - EE
self.movement_result = self.move_arm_joints(q_positions=joint_positions,
time_from_start=self.action_speed)
self.within_goal_space = True
else:
if self.log_internal_state:
rospy.logwarn(f"The action: {action} is not reachable!")
rospy.logdebug(f"Set action failed for --->: {action}")
self.movement_result = False
self.within_goal_space = False
else:
if self.log_internal_state:
rospy.logdebug(f"Set action failed for --->: {action}")
self.movement_result = False
self.within_goal_space = False
# --- Joint action
else:
# --- Make actions as deltas
if self.delta_action:
# we can use smoothing using the action_cycle_time or delta_coeff
if self.use_smoothing:
if self.action_cycle_time == 0.0:
# first derivative of the action
self.action_vector = self.action_vector + (self.delta_coeff * action)
# clip the action vector to be within -1 and 1
self.action_vector = np.clip(self.action_vector, -1, 1)
# add the action vector to the current ee pos
action = self.joint_values + (self.action_vector * self.delta_coeff)
else:
# first derivative of the action
self.action_vector = self.action_vector + (self.action_cycle_time * action)
# clip the action vector to be within -1 and 1
self.action_vector = np.clip(self.action_vector, -1, 1)
# add the action vector to the current ee pos
action = self.joint_values + (self.action_vector * self.action_cycle_time)
else:
action = self.joint_values + (action * self.delta_coeff)
# check if the action is within the joint limits (for the reward calculation)
min_joint_values = np.array(self.min_joint_values)
max_joint_values = np.array(self.max_joint_values)
self.action_not_in_limits = np.any(action <= (min_joint_values + 0.0001)) or np.any(
action >= (max_joint_values - 0.0001))
# clip the action
if self.debug:
if self.log_internal_state:
rospy.logwarn(f"Action + current joint_values before clip --->: {action}")
action = np.clip(action, self.min_joint_values, self.max_joint_values)
if self.debug:
if self.log_internal_state:
rospy.logwarn(f"Action + current joint_values after clip --->: {action}")
# check if the action is within the workspace
if self.check_action_within_workspace(action):
# Per-link FK safety: see EE branch above. self.joint_values
# is refreshed at env_loop tick, so the delta cap can use it.
# On hardware this is the last guard before move_arm_joints
# publishes to the niryo driver.
safe, reason = self._check_action_links_safe(
action, current_joints=self.joint_values
)
if not safe:
if self.log_internal_state:
rospy.logwarn(f"[SAFETY] joint action rejected: {reason}")
self.movement_result = False
self.within_goal_space = False
else:
# execute the trajectory - ros_controllers
self.movement_result = self.move_arm_joints(q_positions=action, time_from_start=self.action_speed)
self.within_goal_space = True
else:
if self.log_internal_state:
rospy.logdebug(f"Set action failed for --->: {action}")
self.movement_result = False
self.within_goal_space = False
[docs]
def sample_observation(self):
"""
Function to get an observation from the environment.
# traditional observations
01. EE pos - 3
02. Vector to the goal (normalized linear distance) - 3
03. Euclidian distance (ee to reach goal)- 1
04. Current Joint values - 6
05. Previous action - 6 or 3 (joint or ee)
06. Joint velocities - 8
total: (3x2) + 1 + (6 or 3) + (6x2)
# depth image
480x640 32FC1
# rgb image
480x640X3 RGB images
Returns:
An observation representing the current state of the environment.
"""
current_goal = self.reach_goal
# --- 1. Get EE position
ee = self.fk_pykdl(self.joint_pos_all)
if ee is None:
ee = self.ee_pos
self.ee_pos = np.asarray(ee, dtype=np.float32)
# --- Linear distance to the goal
linear_dist_ee_goal = current_goal - self.ee_pos # goal is box dtype and ee_pos is numpy.array. It is okay
# --- 2. Vector to goal (we are giving only the direction vector)
vec_ee_goal = self._safe_unit_vector(linear_dist_ee_goal)
# --- 3. Euclidian distance
euclidean_distance_ee_goal = scipy.spatial.distance.euclidean(self.ee_pos, current_goal) # float
# --- Get Current Joint values - only for the joints we are using
# NED2 always publishes 8 joint states (6 arm + 2 mors prismatic fingers),
# so joint_pos_all and current_joint_velocities are 8-dim. The declared
# bounds (min/max_joint_angles, min/max_joint_vel in the task config)
# cover all 8 to match.
self.joint_values = list(self.joint_pos_all)
# we don't need to convert this to numpy array since we concat using numpy below
if self.prev_action is None:
# we can use the ee_pos as the previous action - for EE action type
if self.ee_action_type:
prev_action = self.ee_pos
# we can use the joint values as the previous action - for Joint action type
else:
prev_action = self.joint_values.copy()
else:
prev_action = self.prev_action.copy()
# our observations
obs = np.concatenate((self.ee_pos, vec_ee_goal, euclidean_distance_ee_goal, self.joint_pos_all,
prev_action, self.current_joint_velocities), axis=None, dtype=np.float32)
if self.log_internal_state:
rospy.loginfo(f"Observations --->: {obs}")
if self.normal_obs:
return obs.copy()
elif self.rgb_obs:
return self.cv_image_rgb.copy()
elif self.rgb_plus_normal_obs:
return {"rgb_image": self.cv_image_rgb.copy(),
"observations": obs.copy()}
elif self.rgb_plus_depth_plus_normal_obs:
return {"depth_image": self.cv_image_depth.copy(),
"rgb_image": self.cv_image_rgb.copy(),
"observations": obs.copy()}
else:
return obs.copy()
[docs]
def calculate_reward(self):
"""
Function to get a reward from the environment.
Sparse Reward: float => 1.0 for success, -1.0 for failure
Dense Reward:
if reached: self.reached_goal_reward (positive reward)
else: - self.mult_dist_reward * distance_to_the_goal
and as always, negative rewards for each step, non-execution and actions not within joint limits
Returns:
A scalar reward value representing how well the agent is doing in the current episode.
"""
# - Init reward
reward = 0.0
achieved_goal = self.ee_pos
desired_goal = self.reach_goal
# if it's "Sparse" reward structure
if self.reward_arc == "Sparse":
# initialise the sparse reward as negative
reward = -1.0
# The marker only turns green if reach is done. Otherwise, it is red.
self.goal_marker.set_color(r=1.0, g=0.0)
self.goal_marker.set_duration(duration=5)
# check if robot reached the goal
reach_done = self.check_if_reach_done(achieved_goal, desired_goal)
if reach_done:
reward = 1.0
# done (green) goal_marker
self.goal_marker.set_color(r=0.0, g=1.0)
# publish the marker to the topic
self.goal_marker.publish()
# log the reward
if self.log_internal_state:
rospy.logwarn(">>>REWARD>>>" + str(reward))
# Since we only look for Sparse or Dense, we don't need to check if it's Dense
else:
# if it's "Dense" reward structure
# in case of simple dense reward
if self.simple_dense_reward:
# - Distance from EE to goal reward
dist2goal = scipy.spatial.distance.euclidean(achieved_goal, desired_goal)
reward += - dist2goal
# for normal dense reward
else:
# - Check if the EE reached the goal
done = self.check_if_reach_done(achieved_goal, desired_goal)
if done:
# EE reached the goal
reward += self.reached_goal_reward
# done (green) goal_marker
self.goal_marker.set_color(r=0.0, g=1.0)
self.goal_marker.set_duration(duration=5)
else:
# not done (red) goal_marker
self.goal_marker.set_color(r=1.0, g=0.0)
self.goal_marker.set_duration(duration=5)
# - Distance from EE to goal reward
dist2goal = scipy.spatial.distance.euclidean(achieved_goal, desired_goal)
reward += - self.mult_dist_reward * dist2goal
# - Constant step reward
reward += self.step_reward
# publish the goal marker
self.goal_marker.publish()
# - Check if actions are in limits
reward += self.action_not_in_limits * self.joint_limits_reward
# - Check if the action is within the goal space
reward += (not self.within_goal_space) * self.not_within_goal_space_reward
# to punish for actions where we cannot execute
if not self.movement_result:
reward += self.none_exe_reward
# log the reward
if self.log_internal_state:
rospy.logwarn(">>>REWARD>>>" + str(reward))
return reward
[docs]
def check_if_done(self):
"""
Function to check if the episode is done.
The Task is done if the EE is close enough to the goal
Returns:
A boolean value indicating whether the episode has ended
(e.g., because a goal has been reached or a failure condition has been triggered)
"""
# this is for logging in different colours
# Define ANSI escape codes for different colors
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
ENDC = '\033[0m'
# --- Init done
done = False
# - Check if the ee reached the goal
done_reach = self.check_if_reach_done(self.ee_pos, self.reach_goal)
if done_reach:
if self.log_internal_state:
rospy.loginfo(GREEN + ">>>>>>>>>>>> Reached the Goal! >>>>>>>>>>>" + ENDC)
done = True
self.current_action = None # we don't need to execute any more actions
self.init_done = False # we don't need to execute the loop until we reset the env
self.info_r['is_success'] = True
else:
self.info_r['is_success'] = False
return done
[docs]
def check_if_reach_done(self, achieved_goal, desired_goal):
"""
Check if the reach is done
"""
done = False
# distance between achieved goal and desired goal
distance = scipy.spatial.distance.euclidean(achieved_goal, desired_goal)
if self.debug:
print("distance to the goal:", distance)
if distance <= self.reach_tolerance:
done = True
return done
[docs]
def test_goal_pos(self, goal):
"""
Function to check if the given goal is reachable
"""
if self.log_internal_state:
rospy.logdebug(f"Goal to check: {str(goal)}")
result = self.check_goal(goal)
if not result:
if self.log_internal_state:
rospy.logdebug("The goal is not reachable")
return result
# not used
[docs]
def get_random_goal(self, max_tries: int = 100):
"""
Function to get a reachable goal
"""
for i in range(max_tries):
goal = self._sample_box(self.goal_space)
if self.test_goal_pos(goal):
return True, goal
if self.log_internal_state:
rospy.logdebug("Getting a random goal failed!")
return False, None
[docs]
def get_random_goal_no_check(self):
"""
Function to get a random goal without checking
"""
return True, self._sample_box(self.goal_space)
# not used
[docs]
def check_action_within_goal_space_fk(self, action):
"""
Function to check if the given action is within the goal space
"""
# check if the resulting ee pose is within the goal space - using FK
ee_pos = self.calculate_fk(joint_positions=action)
if ee_pos is not None:
# check if the ee pose is within the goal space - using self.goal_space
if self.goal_space.contains(ee_pos):
if self.log_internal_state:
rospy.logdebug(f"The ee pose of the {action} is within the goal space!")
return True
else:
if self.log_internal_state:
rospy.logdebug(f"The ee pose of the {ee_pos} is not within the goal space!")
return False
if self.log_internal_state:
rospy.logwarn("Checking if the action is within the goal space failed!")
return False
[docs]
def check_action_within_workspace(self, action):
"""
Function to check if the given action is within the workspace
Args:
action: The action to be applied to the robot.
Returns:
A boolean value indicating whether the action is within the workspace
"""
BLUE = '\033[94m'
ENDC = '\033[0m'
# check if the resulting ee pose is within the workspace - using FK
ee_pos = self.fk_pykdl(action=action)
if self.debug:
print("goal space", self.workspace_space) # for debugging
if ee_pos is not None:
# check if the ee pose is within the workspace - using self.workspace_space
if self.workspace_space.contains(ee_pos):
if self.log_internal_state:
rospy.logdebug(f"The ee pose {ee_pos} of the {action} is within the workspace!")
if self.debug:
print(BLUE + f"The ee pose {ee_pos} of the {action} is within the workspace!" + ENDC)
return True
else:
if self.log_internal_state:
rospy.logdebug(f"The ee pose {ee_pos} is not within the workspace!")
if self.debug:
print(BLUE + f"The ee pose {ee_pos} is not within the workspace!" + ENDC)
return False
if self.log_internal_state:
rospy.logwarn("Checking if the action is within the workspace failed!")
if self.debug:
print(BLUE + "Checking if the action is within the workspace failed!" + ENDC)
return False
[docs]
def check_if_z_within_limits(self, action):
"""
Function to check if the given ee_pos is within the limits
"""
# get the ee pose from the action using FK
ee_pos = self.fk_pykdl(action=action)
# The robot is mounted on a table. So, we need to check if the z is within the limits
if ee_pos is not None:
# check if the z is within the limits
if ee_pos[2] > self.lowest_z:
if self.log_internal_state:
rospy.logdebug(f"The ee pose {ee_pos} of the {action} is within the z limit!")
return True
else:
if self.log_internal_state:
rospy.logdebug(f"The ee pose {ee_pos} is not within the z limit!")
return False
else:
if self.log_internal_state:
rospy.logwarn("Checking if the action is within the z limit failed!")
return False
def _get_params(self):
"""
Function to get configuration parameters (optional)
"""
# Action Space
self.min_joint_values = rospy.get_param('/ned2/min_joint_pos')
self.max_joint_values = rospy.get_param('/ned2/max_joint_pos')
# Observation Space
self.position_ee_max = rospy.get_param('/ned2/position_ee_max')
self.position_ee_min = rospy.get_param('/ned2/position_ee_min')
self.rpy_ee_max = rospy.get_param('/ned2/rpy_ee_max')
self.rpy_ee_min = rospy.get_param('/ned2/rpy_ee_min')
self.linear_distance_max = rospy.get_param('/ned2/linear_distance_max')
self.linear_distance_min = rospy.get_param('/ned2/linear_distance_min')
self.max_distance = rospy.get_param('/ned2/max_distance')
self.min_joint_vel = rospy.get_param('/ned2/min_joint_vel')
self.max_joint_vel = rospy.get_param('/ned2/max_joint_vel')
self.min_joint_angles = rospy.get_param('/ned2/min_joint_angles')
self.max_joint_angles = rospy.get_param('/ned2/max_joint_angles')
# Goal space
self.position_goal_max = rospy.get_param('/ned2/position_goal_max')
self.position_goal_min = rospy.get_param('/ned2/position_goal_min')
# Achieved goal
self.position_achieved_goal_max = rospy.get_param('/ned2/position_achieved_goal_max')
self.position_achieved_goal_min = rospy.get_param('/ned2/position_achieved_goal_min')
# Desired goal
self.position_desired_goal_max = rospy.get_param('/ned2/position_desired_goal_max')
self.position_desired_goal_min = rospy.get_param('/ned2/position_desired_goal_min')
# Tolerances
self.reach_tolerance = rospy.get_param('/ned2/reach_tolerance')
# Real-side env_loop freshness threshold (seconds). If
# /joint_states hasn't published within this window, env_loop
# stops the arm and bails out of the current tick.
self.joint_state_timeout_s = float(
rospy.get_param('/ned2/joint_state_timeout_s', 0.5)
)
# Variables related to rewards
self.step_reward = rospy.get_param('/ned2/step_reward')
self.mult_dist_reward = rospy.get_param('/ned2/multiplier_dist_reward')
self.reached_goal_reward = rospy.get_param('/ned2/reached_goal_reward')
self.joint_limits_reward = rospy.get_param('/ned2/joint_limits_reward')
self.none_exe_reward = rospy.get_param('/ned2/none_exe_reward')
self.not_within_goal_space_reward = rospy.get_param('/ned2/not_within_goal_space_reward')
# workspace
self.workspace_max = rospy.get_param('/ned2/workspace_max')
self.workspace_min = rospy.get_param('/ned2/workspace_min')
# ------------------------------------------------------
# Task Methods for launching roscore
@staticmethod
def _launch_roscore(port=None, set_new_master_vars=False, default_port=False):
"""
Launches a new roscore with the specified port. Only updates the ros_port.
Return:
ros_port: port of launched roscore
"""
if port is None:
port = 11311 # default port
ros_port = ros_common.launch_roscore(port=port, set_new_master_vars=set_new_master_vars,
default_port=default_port)
# change to new rosmaster
ros_common.change_ros_master(ros_port)
return ros_port