#!/bin/python3
from gymnasium import spaces
from gymnasium.envs.registration import register
import numpy as np
from multiros.envs import GazeboBaseEnv
import rospy
import rostopic
from sensor_msgs.msg import JointState, PointCloud2, Image
from geometry_msgs.msg import Pose
from std_msgs.msg import Float64
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from control_msgs.msg import FollowJointTrajectoryAction, FollowJointTrajectoryGoal
import actionlib
from cv_bridge import CvBridge
import cv2
# core modules of the framework
from multiros.utils import gazebo_core
from multiros.utils import gazebo_models
from multiros.utils import gazebo_physics
from multiros.utils.moveit_multiros import MoveitMultiros
from multiros.utils import ros_common
from multiros.utils import ros_controllers
from multiros.utils import ros_kinematics
from urdf_parser_py.urdf import URDF
from pykdl_utils.kdl_kinematics import KDLKinematics
from tf.transformations import euler_from_matrix, euler_from_quaternion
register(
id='NED2RobotEnv-v0',
entry_point='rl_environments.ned2.sim.robot_envs.ned2_robot_sim:NED2RobotEnv',
max_episode_steps=1000,
)
[docs]
class NED2RobotEnv(GazeboBaseEnv.GazeboBaseEnv):
"""
Superclass for all NED2 Robot environments - gazebo-based.
"""
def __init__(self, ros_port: str = None, gazebo_port: str = None, gazebo_pid=None, seed: int = None,
real_time: bool = False, action_cycle_time=0.0, load_cube: bool = False, load_table: bool = False,
use_camera: bool = False, use_wrist_camera: bool = False,
gripper: bool = False):
"""
Initializes a new Robot Environment
Describe the robot and the sensors used in the env.
Sensor Topic List:
MoveIt: To get the pose and rpy of the robot.
/joint_states: JointState received for the joints of the robot
/gazebo_camera/image_raw: RGB image from the robot camera.
Actuators Topic List:
MoveIt: Send the joint positions to the robot.
/ned2/niryo_robot_follow_joint_trajectory_controller/command: arm trajectory controller.
/ned2/gazebo_tool_commander/follow_joint_trajectory: gripper action server (sim-only; niryo_robot_tools_commander is bypassed in sim, see move_gripper_joints).
"""
rospy.loginfo("Start Init NED2RobotEnv Multiros")
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
self.gripper = gripper
self.real_time = real_time # if True, the simulation will run in real time
# we don't need to pause/unpause gazebo if we are running in real time
if self.real_time:
unpause_pause_physics = False
else:
unpause_pause_physics = True
if not self.real_time:
gazebo_core.unpause_gazebo()
spawn_robot = True
# Location of the robot URDF file.
# Loaded from niryo_ned2_description_extras (sibling sim package
# — see rl_environments/README.md §3a) which wraps Niryo's
# upstream URDF + adds head-mount Kinect v2 (so /head_mount_kinect2/*
# subscribers actually receive data in sim) + Niryo's built-in
# wrist camera. Two variants:
# ned2_kinect.urdf.xacro → arm + kinect2 + wrist cam.
# ned2_kinect_gripper.urdf.xacro → arm + adaptive gripper +
# kinect2 + wrist cam +
# mors transmissions.
urdf_pkg_name = "niryo_ned2_description_extras"
urdf_file_name = "ned2_kinect.urdf.xacro" if not gripper else "ned2_kinect_gripper.urdf.xacro"
urdf_folder = "/urdf"
# extra urdf args
urdf_xacro_args = None
# namespace of the robot. Keep the leading slash — without it,
# rostopic.get_topic_type("ned2/joint_states", blocking=True)
# does an exact string match against the master's "/ned2/joint_states"
# and the readiness check spins forever. Matches goal_sim + RX200.
namespace = "/ned2"
# robot state publisher
robot_state_publisher_max_freq = None
new_robot_state_term = False
robot_model_name = "ned2"
robot_ref_frame = "world"
# Set the initial pose of the robot model
robot_pos_x = 0.0
robot_pos_y = 0.0
robot_pos_z = 0.0 if not load_table else 0.78
robot_ori_w = 1.0
robot_ori_x = 0.0
robot_ori_y = 0.0
robot_ori_z = 0.0
# Controllers config — loaded from niryo_ned2_description_extras
# (single source of truth for both standalone roslaunch testing
# and RL env runtime). Two variants:
# ned2_controllers.yaml → joint_state + arm trajectory
# ned2_controllers_w_gripper.yaml → + gazebo_tool_commander + mors PID
controller_package_name = "niryo_ned2_description_extras"
controllers_file = "ned2_controllers.yaml" if not gripper else "ned2_controllers_w_gripper.yaml"
controllers_list = ["joint_state_controller", "niryo_robot_follow_joint_trajectory_controller"] if not gripper else \
["joint_state_controller", "niryo_robot_follow_joint_trajectory_controller", "gazebo_tool_commander"]
# spawn a table
self.load_table = load_table
if load_table:
# we can use the model from reactorx200_description package
gazebo_models.spawn_sdf_model_gazebo(pkg_name="reactorx200_description", file_name="model.sdf",
model_folder="/models/table",
model_name="table", namespace=namespace,
pos_x=0.2)
# above function pauses the simulation, so we need to unpause it for real-time
if self.real_time:
gazebo_core.unpause_gazebo()
# spawn a cube
if load_cube:
gazebo_models.spawn_sdf_model_gazebo(pkg_name="reactorx200_description", file_name="block.sdf",
model_folder="/models/block",
model_name="red_cube", namespace=namespace,
pos_x=0.35,
pos_z=0.795 if load_table else 0.020)
# above function pauses the simulation, so we need to unpause it for real-time
if self.real_time:
gazebo_core.unpause_gazebo()
reset_controllers = False
reset_mode = "world"
sim_step_mode = 1
num_gazebo_steps = 1
gazebo_max_update_rate = None
gazebo_timestep = None
if rospy.has_param('/ned2/gazebo_update_rate_multiplier'):
gazebo_max_update_rate = rospy.get_param('/ned2/gazebo_update_rate_multiplier')
rospy.loginfo(f"Applied Gazebo update_rate_multiplier = {gazebo_max_update_rate}")
if rospy.has_param('/ned2/gazebo_time_step'):
gazebo_timestep = rospy.get_param('/ned2/gazebo_time_step')
rospy.loginfo(f"Applied Gazebo time_step = {gazebo_timestep}")
kill_rosmaster = True
kill_gazebo = True
clean_logs = False
# Pre-load controllers YAML (with PID gains) under /ned2 BEFORE
# gazebo spawns the model. The gazebo_ros_control plugin reads
# /ned2/gazebo_ros_control/pid_gains/joint_* at plugin-init time
# (when the model is spawned). multiros's spawn_robot_in_gazebo
# loads the controllers YAML AFTER spawn, which is too late —
# the plugin has already given up and logged "No p gain
# specified for pid". Without PIDs the arm sags under gravity
# and MoveIt can't converge, taking down the third xterm.
ros_common.ros_load_yaml(
pkg_name=controller_package_name,
file_name=controllers_file,
ns="/" + namespace.lstrip("/"),
)
super().__init__(
spawn_robot=spawn_robot, urdf_pkg_name=urdf_pkg_name, urdf_file_name=urdf_file_name,
urdf_folder=urdf_folder, urdf_xacro_args=urdf_xacro_args, namespace=namespace,
robot_state_publisher_max_freq=robot_state_publisher_max_freq, new_robot_state_term=new_robot_state_term,
robot_model_name=robot_model_name, robot_ref_frame=robot_ref_frame,
robot_pos_x=robot_pos_x, robot_pos_y=robot_pos_y, robot_pos_z=robot_pos_z, robot_ori_w=robot_ori_w,
robot_ori_x=robot_ori_x, robot_ori_y=robot_ori_y, robot_ori_z=robot_ori_z,
controllers_file=controllers_file, controllers_list=controllers_list,
reset_controllers=reset_controllers, reset_mode=reset_mode, sim_step_mode=sim_step_mode,
num_gazebo_steps=num_gazebo_steps, gazebo_max_update_rate=gazebo_max_update_rate,
gazebo_timestep=gazebo_timestep, kill_rosmaster=kill_rosmaster, kill_gazebo=kill_gazebo,
clean_logs=clean_logs, ros_port=ros_port, gazebo_port=gazebo_port, gazebo_pid=gazebo_pid, seed=seed,
unpause_pause_physics=unpause_pause_physics, action_cycle_time=action_cycle_time,
controller_package_name=controller_package_name)
# ---------- joint state
if namespace is not None and namespace != '/':
self.joint_state_topic = namespace + "/joint_states"
else:
self.joint_state_topic = "/joint_states"
self.joint_state_sub = rospy.Subscriber(self.joint_state_topic, JointState, self.joint_state_callback)
self.joint_state = JointState()
# ---------- Moveit
# Use the description-extras wrapper, which includes Niryo's
# move_group.launch under <group ns="ned2"> so move_group ends
# up at /ned2/move_group/... and MoveitMultiros(ns="ned2") can
# find its action servers. Without the wrap, Niryo's launch
# runs at root and the env hangs on the readiness check.
ros_common.ros_launch_launcher(
pkg_name="niryo_ned2_description_extras",
launch_file_name="ned2_move_group.launch",
args=[
"hardware_version:=ned2",
"simulation_mode:=true",
"load_robot_description:=false",
f"gripper:={'true' if gripper else 'false'}",
],
)
# ---------- kinect
self.use_camera = use_camera
if self.use_camera:
# depth image subscriber
self.kinect_depth_sub = rospy.Subscriber("/head_mount_kinect2/depth/image_raw", Image,
self.kinect_depth_callback)
self.kinect_depth = Image()
self.cv_image_depth = None
# rgb image subscriber
self.kinect_rgb_sub = rospy.Subscriber("/head_mount_kinect2/rgb/image_raw", Image,
self.kinect_rgb_callback)
self.kinect_rgb = Image()
self.cv_image_rgb = None
# ---------- Niryo's built-in wrist camera (opt-in, default off).
# niryo_ned2_description_extras loads niryo_ned2_camera.urdf.xacro
# which puts a Gazebo camera plugin on `camera_link` (wrist-
# mounted). Topics published by the plugin: /gazebo_camera/image_raw
# (sensor_msgs/Image) and /gazebo_camera/camera_info.
# Useful for EE-mounted perception studies / PnP close-up obs.
self.use_wrist_camera = use_wrist_camera
if self.use_wrist_camera:
self.wrist_camera_rgb_sub = rospy.Subscriber("/gazebo_camera/image_raw", Image,
self.wrist_camera_rgb_callback)
self.wrist_camera_rgb = Image()
self.cv_image_wrist = None
self._check_connection_and_readiness()
self.arm_joint_names = ["joint_1",
"joint_2",
"joint_3",
"joint_4",
"joint_5",
"joint_6"]
self.gripper_joint_names = ["joint_base_to_mors_1",
"joint_base_to_mors_2"]
if self.real_time:
# we don't need to pause/unpause gazebo if we are running in real time
self.move_NED2_object = MoveitMultiros(arm_name='arm',
robot_description="ned2/robot_description",
ns="ned2", pause_gazebo=False)
else:
self.move_NED2_object = MoveitMultiros(arm_name='arm',
robot_description="ned2/robot_description",
ns="ned2")
# low-level control
# rostopic for arm trajectory controller
self.arm_controller_pub = rospy.Publisher('/ned2/niryo_robot_follow_joint_trajectory_controller/command',
JointTrajectory,
queue_size=10)
# parameters for calculating FK, IK
# tool_link, not wrist_link: the action vector has 6 joint values
# (NED2 is 6-DOF), but base_link → wrist_link is a 5-joint chain
# (joint_1..joint_5). KDL needs len(q) == subchain joints, so an
# ee_link with 6 joints in the chain is required. tool_link is
# also what Niryo's SRDF declares as the planning-group EE
# (group "arm" ends at tool_link), so this aligns FK with the
# MoveIt planning target.
self.ee_link = "tool_link"
self.ref_frame = "base_link"
# Fk with pykdl_utils - old method
self.pykdl_robot = URDF.from_parameter_server(key='ned2/robot_description')
self.kdl_kin = KDLKinematics(urdf=self.pykdl_robot, base_link=self.ref_frame, end_link=self.ee_link)
# with ros_kinematics
self.ros_kin = ros_kinematics.Kinematics_pyrobot(robot_description_parm="ned2/robot_description",
base_link=self.ref_frame,
end_link=self.ee_link)
# Per-link FK chains for the safety check in _check_action_links_safe.
# See RX200RobotEnv (d8b5517) for the full rationale — short version:
# PyKDL's ChainFkSolverPos_recursive expects len(q) to match the
# *subchain* joint count, not the full arm DOF. Caching one
# KDLKinematics per check-link lets us slice q[:n] correctly.
#
# Diagnostic logging: print BEFORE each KDLKinematics build so a
# hang in the pykdl C extension surfaces the offending link name
# in the log (try/except can't catch a C-level hang). Removed
# once the link list is verified working on each robot.
self._safety_kin = {}
for _link in self.SAFETY_CHECK_LINKS:
rospy.loginfo(f"[SAFETY] building kinematics for {_link} ...")
try:
_kin = KDLKinematics(urdf=self.pykdl_robot,
base_link=self.ref_frame,
end_link=_link)
self._safety_kin[_link] = (_kin, int(_kin.num_joints))
rospy.loginfo(f"[SAFETY] {_link} ok ({_kin.num_joints} joints)")
except Exception as _e:
rospy.logwarn(f"[SAFETY] kinematics setup failed for {_link}: {_e}")
if not self.real_time:
gazebo_core.pause_gazebo()
else:
gazebo_core.unpause_gazebo() # this is because loading models will pause the simulation
rospy.loginfo("End Init NED2RobotEnv")
# ---------------------------------------------------
# Custom methods for the Custom Robot Environment
[docs]
def get_model_pose(self, model_name="red_cube", rpy=True):
"""
Get the pose of an object in Gazebo
Args:
model_name: name of the object whose pose is to be retrieved
rpy: True if the orientation is to be returned as euler angles (default: True)
Returns:
success: True if the pose is retrieved successful
position: position of the object as a numpy array
orientation: orientation of the object as a numpy array (rpy or quaternion)
"""
if not self.real_time:
gazebo_core.unpause_gazebo()
# pose is a geometry_msgs/Pose Message
# relative_entity_name uses Gazebo's "<model>/<link>" convention
# (mirrors RX200's "rx200/base_link"). NOT a URDF link name — the
# URDF links are bare (base_link, wrist_link, ...) but Gazebo's
# link-state lookup needs the model-qualified form.
header, pose, twist, success = gazebo_models.gazebo_get_model_state(model_name=model_name,
relative_entity_name="ned2/base_link")
if not self.real_time:
gazebo_core.pause_gazebo()
if success:
if rpy:
# The previous implementation passed an arbitrary 3x3 grid of
# quaternion components to euler_from_matrix, which is NOT a
# rotation matrix; the returned angles were meaningless.
# Use the proper quaternion -> euler conversion instead.
orientation = euler_from_quaternion(
[pose.orientation.x, pose.orientation.y,
pose.orientation.z, pose.orientation.w])
orientation = np.array(orientation, dtype=np.float32)
else:
orientation = np.array([pose.orientation.x, pose.orientation.y, pose.orientation.z,
pose.orientation.w], dtype=np.float32)
position = np.array([pose.position.x, pose.position.y, pose.position.z], dtype=np.float32)
# return position and orientation as numpy arrays
return success, position, orientation
# if the pose is not retrieved successfully
return success, None, None
[docs]
def spawn_cube_in_gazebo(self, model_pos_x, model_pos_y):
"""
Spawn a cube in Gazebo
Args:
model_pos_x: x-coordinate of the cube
model_pos_y: y-coordinate of the cube
Returns:
done: True if the cube is spawned successfully
"""
if self.load_table:
model_pos_z = 0.795
else:
model_pos_z = 0.020
# spawn a cube
done = gazebo_models.spawn_sdf_model_gazebo(pkg_name="reactorx200_description", file_name="block.sdf",
model_folder="/models/block",
model_name="red_cube", namespace="/ned2",
pos_x=model_pos_x,
pos_y=model_pos_y,
pos_z=model_pos_z)
# above function pauses the simulation, so we need to unpause it
if self.real_time:
gazebo_core.unpause_gazebo()
return done
[docs]
def remove_cube_in_gazebo(self):
"""
Remove the cube from Gazebo
"""
done = gazebo_models.remove_model_gazebo(model_name="red_cube")
# above function pauses the simulation, so we need to unpause it
if self.real_time:
gazebo_core.unpause_gazebo()
return done
[docs]
def fk_pykdl(self, action):
"""
Function to calculate the forward kinematics of the robot arm. We are using pykdl_utils.
Args:
action: joint positions of the robot arm (in radians)
Returns:
ee_position: end-effector position as a numpy array
"""
# Defensive: callers (sample_observation) hit this before the
# first /joint_states callback can populate self.joint_pos_all
# in some env-loop startup races. Return None so the caller's
# if ee is None: ee = self.ee_pos fallback keeps the obs
# in a defined state instead of raising from KDL.
if action is None or len(action) == 0:
return None
# Calculate forward kinematics
pose = self.kdl_kin.forward(action)
# Extract position
ee_position = np.array([pose[0, 3], pose[1, 3], pose[2, 3]], dtype=np.float32)
# print("ee pos:", ee_position) # for debugging
# print("ee pos dtype:", type(ee_position)) # for debugging
# Extract rotation matrix and convert to euler angles
# ee_orientation = euler_from_matrix(pose[:3, :3], 'sxyz')
return ee_position
[docs]
def calculate_fk(self, joint_positions, euler=True):
"""
Calculate the forward kinematics of the robot arm using the ros_kinematics package.
Args:
joint_positions: joint positions of the robot arm (in radians)
euler: True if the orientation is to be returned as euler angles (default: True)
Returns:
done: True if the FK calculation is successful
ee_position: end-effector position as a numpy array
ee_rpy: end-effector orientation as a list of rpy or quaternion values
"""
done, ee_position, ee_ori = self.ros_kin.calculate_fk(joint_positions, des_frame=self.ee_link, euler=euler)
return done, ee_position, ee_ori
# Arm links whose world z must stay above the table/ground for the
# action to be safe. Order matches the URDF chain shoulder→tool.
# joint5_motor (small fixed mounting link) and hand_link (downstream
# of wrist via fixed joints) are skipped — checking wrist_link and
# tool_link covers everything they bracket.
SAFETY_CHECK_LINKS = (
"shoulder_link",
"arm_link",
"elbow_link",
"forearm_link",
"wrist_link",
"tool_link",
)
def _check_action_links_safe(self, joint_targets, current_joints=None):
"""
Predict each arm link's world z under ``joint_targets`` and reject
the action if any link would dip below ``table_z + safety_z_margin``.
Also caps |target - current| per joint at ``max_joint_delta``.
Mirrors RX200RobotEnv._check_action_links_safe (d8b5517) for Ned2.
See that method's docstring for the design rationale.
Rosparams (all under ``/ned2/``, with sim/real variants where the
real value should be tighter):
table_z, safety_z_margin[_real], max_joint_delta[_real]
Returns
-------
(safe, reason) : (bool, Optional[str])
"""
strict = bool(getattr(self, "enable_strict_safety", False))
table_z = float(rospy.get_param("/ned2/table_z", -0.005))
if strict:
margin = float(rospy.get_param("/ned2/safety_z_margin_strict", 0.030))
max_delta = float(rospy.get_param("/ned2/max_joint_delta_strict", 0.15))
else:
margin = float(rospy.get_param("/ned2/safety_z_margin", 0.015))
max_delta = float(rospy.get_param("/ned2/max_joint_delta", 0.5))
floor = table_z + margin
q = np.asarray(joint_targets, dtype=np.float64)
# Per-joint delta cap (skipped when current pose is unknown).
if current_joints is not None:
cur = np.asarray(current_joints, dtype=np.float64)
if cur.shape == q.shape:
deltas = np.abs(q - cur)
if np.any(deltas > max_delta):
idx = int(np.argmax(deltas))
return False, f"joint[{idx}] delta {deltas[idx]:.3f} > {max_delta}"
# Per-link z check via cached pykdl subchains.
per_link_z = []
for link, (kin, n) in self._safety_kin.items():
try:
pose = kin.forward(q[:n])
except Exception as e:
return False, f"FK failed for {link}: {e}"
z = float(pose[2, 3])
per_link_z.append((link, z))
if z < floor:
return False, f"{link} predicted z={z:.3f} < floor={floor:.3f}"
# First-3-calls debug log — same idiom as RX200 (cd5d930).
if not hasattr(self, "_safety_log_count"):
self._safety_log_count = 0
if self._safety_log_count < 3:
self._safety_log_count += 1
zs = ", ".join(f"{l.rsplit('/', 1)[-1]}={z:.3f}" for l, z in per_link_z)
rospy.loginfo(f"[SAFETY] call #{self._safety_log_count}: floor={floor:.3f}, {zs}")
return True, None
[docs]
def calculate_ik(self, target_pos, ee_ori=np.array([0.0, 0.0, 0.0, 1.0])):
"""
Calculate the inverse kinematics of the robot arm using the ros_kinematics package.
Args:
target_pos: target end-effector position as a numpy array
ee_ori: end-effector orientation as a list of quaternion values (default: [0.0, 0.0, 0.0, 1.0])
Returns:
done: True if the IK calculation is successful
joint_positions: joint positions of the robot arm (in radians)
"""
# define the pose in 1D array [x, y, z, qx, qy, qz, qw]
target_pose = np.concatenate((target_pos, ee_ori))
# get the current joint positions
ee_position = self.get_joint_angles()
done, joint_positions = self.ros_kin.calculate_ik(target_pose=target_pose, tolerance=[1e-3] * 6,
init_joint_positions=ee_position)
return done, joint_positions
[docs]
def joint_state_callback(self, joint_state):
"""
Function to get the joint state of the robot.
"""
if joint_state is not None:
self.joint_state = joint_state
# joint names — used below to pull positions/velocities by
# name rather than trusting the driver's publish order.
self.joint_state_names = list(joint_state.name)
# Build the obs-facing joint vectors by NAME lookup so a
# driver change that re-orders /joint_states (or adds an
# extra finger / mimic joint) doesn't silently scramble the
# observation. The expected joint set is arm + (gripper
# when the gripper URDF is loaded).
wanted = list(self.arm_joint_names) + list(self.gripper_joint_names)
name_to_idx = {n: i for i, n in enumerate(joint_state.name)}
indices = [name_to_idx[n] for n in wanted if n in name_to_idx]
self.joint_pos_all = [joint_state.position[i] for i in indices]
self.current_joint_velocities = [joint_state.velocity[i] for i in indices]
self.current_joint_efforts = [joint_state.effort[i] for i in indices]
[docs]
def move_arm_joints(self, q_positions: np.ndarray, time_from_start: float = 0.5) -> bool:
"""
Set a joint position target only for the arm joints using low-level ros controllers.
Args:
q_positions: joint positions of the robot arm
time_from_start: time from start of the trajectory (set the speed to complete the trajectory)
Returns:
True if the action is successful
"""
# create a JointTrajectory object
trajectory = JointTrajectory()
trajectory.joint_names = self.arm_joint_names
trajectory.points.append(JointTrajectoryPoint())
trajectory.points[0].positions = q_positions
trajectory.points[0].velocities = [0.0] * len(self.arm_joint_names)
trajectory.points[0].accelerations = [0.0] * len(self.arm_joint_names)
trajectory.points[0].time_from_start = rospy.Duration(time_from_start)
# send the trajectory to the controller
self.arm_controller_pub.publish(trajectory)
return True
# Mors prismatic limits (URDF: ±0.01 m). Used to construct the
# "fully open" / "fully closed" trajectory points for sim, where
# we drive gazebo_tool_commander directly instead of routing
# through Niryo's tool_commander_node (which we don't launch in
# sim — keeps the bring-up to gazebo + ros_control only).
_MORS_OPEN_POS = 0.01
_MORS_CLOSED_POS = -0.01
_MORS_MOVE_SECS = 1.0 # time_from_start for the trajectory point
[docs]
def move_gripper_joints(self, action: str) -> bool:
"""
Drive the gripper to "open" or "close" (binary, same API the
real env exposes via niryo_robot_tools_commander).
Sim path: publish a JointTrajectory to
``/gazebo_tool_commander/follow_joint_trajectory`` (the
effort-based controller our description-extras package brings
up). Niryo's tool_commander_node isn't running in sim, so we
bypass it and command the mors joints directly. The action
server contract is identical to what the real env's
tool_commander → effort controller would resolve to internally.
Args:
action: "open" or "close".
Returns:
True if the action server returned a result (success or
otherwise — the env treats "trajectory dispatched" as the
success condition, matching the real-side semantics).
"""
target = self._MORS_OPEN_POS if action == "open" else self._MORS_CLOSED_POS
# Namespace must match where multiros spawned the controller —
# controllers are loaded under /ned2 (see controllers_list), so
# gazebo_tool_commander's action server lives at
# /ned2/gazebo_tool_commander/follow_joint_trajectory, NOT at
# /gazebo_tool_commander/... wait_for_server with no timeout
# would hang env.reset() forever if the path is wrong.
client = actionlib.SimpleActionClient(
'/ned2/gazebo_tool_commander/follow_joint_trajectory',
FollowJointTrajectoryAction,
)
if not client.wait_for_server(timeout=rospy.Duration(10.0)):
rospy.logerr(
"[NED2] gazebo_tool_commander action server not reachable at "
"/ned2/gazebo_tool_commander/follow_joint_trajectory after 10 s. "
"Check that controllers_list included gazebo_tool_commander "
"and that ned2_controllers_w_gripper.yaml loaded under /ned2."
)
return False
goal = FollowJointTrajectoryGoal()
goal.trajectory = JointTrajectory()
goal.trajectory.joint_names = list(self.gripper_joint_names)
point = JointTrajectoryPoint()
point.positions = [target, target]
point.velocities = [0.0, 0.0]
point.accelerations = [0.0, 0.0]
point.time_from_start = rospy.Duration(self._MORS_MOVE_SECS)
goal.trajectory.points.append(point)
client.send_goal(goal)
# Bounded wait so a stuck controller surfaces in logs rather than
# silently freezing env.step / env.reset.
if not client.wait_for_result(timeout=rospy.Duration(self._MORS_MOVE_SECS + 3.0)):
rospy.logwarn(
f"[NED2] gripper '{action}' trajectory did not return a "
f"result within {self._MORS_MOVE_SECS + 3.0:.1f} s; continuing."
)
return True
[docs]
def smooth_trajectory(self, q_positions, time_from_start, multiplier=100):
"""
Smooth the trajectory by interpolating between the current and target positions.
Args:
q_positions: target joint positions
time_from_start: time from start of the trajectory (set the speed to complete the trajectory)
multiplier: number of steps to interpolate between the current and target positions
"""
num_steps = int(time_from_start * multiplier) # Adjust the multiplier for more or fewer steps
current_positions = self.joint_values
delta_positions = (q_positions - current_positions) / num_steps
trajectory_points = []
for step in range(1, num_steps + 1):
intermediate_positions = current_positions + step * delta_positions
trajectory_points.append((intermediate_positions, time_from_start / num_steps * step))
self.publish_trajectory(trajectory_points)
return True
[docs]
def publish_trajectory(self, trajectory_points):
"""
Publish the entire trajectory at once.
Args:
trajectory_points: List of tuples containing joint positions and time_from_start
"""
trajectory = JointTrajectory()
trajectory.joint_names = self.arm_joint_names
for positions, time_from_start in trajectory_points:
point = JointTrajectoryPoint()
point.positions = positions
point.velocities = [0.0] * len(self.arm_joint_names)
point.accelerations = [0.0] * len(self.arm_joint_names)
point.time_from_start = rospy.Duration(time_from_start)
trajectory.points.append(point)
# send the trajectory to the controller
self.arm_controller_pub.publish(trajectory)
[docs]
def set_trajectory_joints(self, q_positions: np.ndarray) -> bool:
"""
Set a joint position target only for the arm joints using moveit.
"""
if self.real_time:
# do not wait for the action to finish
return self.move_NED2_object.set_trajectory_joints(q_positions, async_move=True)
else:
return self.move_NED2_object.set_trajectory_joints(q_positions)
[docs]
def set_trajectory_ee(self, pos: np.ndarray) -> bool:
"""
Set a pose target for the end effector of the robot arm using moveit.
"""
if self.real_time:
# do not wait for the action to finish
return self.move_NED2_object.set_trajectory_ee(position=pos, async_move=True)
else:
return self.move_NED2_object.set_trajectory_ee(position=pos)
[docs]
def get_ee_pose(self):
"""
Returns the end-effector pose as a geometry_msgs/PoseStamped message
This gives us the best pose if we are using the moveit config of the ReactorX repo
They are getting the pose with ee_gripper_link
"""
return self.move_NED2_object.get_robot_pose()
[docs]
def get_ee_rpy(self):
"""
Returns the end-effector orientation as a list of roll, pitch, and yaw angles.
"""
return self.move_NED2_object.get_robot_rpy()
[docs]
def get_joint_angles(self):
"""
get current joint angles of the robot arm - 6 elements
Returns a list
"""
return self.move_NED2_object.get_joint_angles_robot_arm()
[docs]
def check_goal(self, goal):
"""
Check if the goal is reachable
"""
return self.move_NED2_object.check_goal(goal)
[docs]
def check_goal_reachable_joint_pos(self, joint_pos):
"""
Check if the goal is reachable with joint positions
"""
return self.move_NED2_object.check_goal_joint_pos(joint_pos)
[docs]
def kinect_depth_callback(self, data):
"""
Callback function for kinect depth sensor
"""
self.kinect_depth = data
# Convert ROS image message to OpenCV format (32FC1)
bridge = CvBridge()
cv_image_depth = bridge.imgmsg_to_cv2(data, desired_encoding="32FC1")
self.cv_image_depth = cv_image_depth
# print("Shape of depth:", cv_image_depth.shape) # for debugging
# (480, 640) - for pytorch, this needs to be converted to (1, 480, 640)
[docs]
def kinect_rgb_callback(self, img_msg):
"""
Callback function for kinect rgb sensor
"""
self.kinect_rgb = img_msg
bridge = CvBridge()
# Convert ROS image message to OpenCV format (BGR)
cv_image_bgr = bridge.imgmsg_to_cv2(img_msg, desired_encoding="bgr8")
# Convert from BGR to RGB (required for pytorch or tensorflow CNNs) - (480, 640, 3)
self.cv_image_rgb = cv2.cvtColor(cv_image_bgr, cv2.COLOR_BGR2RGB)
# print("Shape of rgb:", cv_image_rgb.shape) # for debugging
# (480, 640, 3) - for pytorch, this needs to be converted to (3, 480, 640)
[docs]
def wrist_camera_rgb_callback(self, img_msg):
"""
Callback for Niryo's built-in wrist camera (sim, on `camera_link`).
Topic: /gazebo_camera/image_raw — sensor_msgs/Image. Converts to
an OpenCV RGB numpy array on self.cv_image_wrist.
"""
self.wrist_camera_rgb = img_msg
bridge = CvBridge()
cv_image_bgr = bridge.imgmsg_to_cv2(img_msg, desired_encoding="bgr8")
self.cv_image_wrist = cv2.cvtColor(cv_image_bgr, cv2.COLOR_BGR2RGB)
# helper fn for _check_connection_and_readiness
def _check_joint_states_ready(self):
"""
Function to check if the joint states are received
"""
if not self.real_time:
gazebo_core.unpause_gazebo() # Unpause Gazebo physics
# Wait for the service to be available
rospy.logdebug(rostopic.get_topic_type(self.joint_state_topic, blocking=True))
return True
# helper fn for _check_connection_and_readiness
def _check_moveit_ready(self):
"""
Function to check if moveit services are running
"""
rospy.wait_for_service("/ned2/move_group/trajectory_execution/set_parameters")
rospy.logdebug(rostopic.get_topic_type("/ned2/planning_scene", blocking=True))
rospy.logdebug(rostopic.get_topic_type("/ned2/move_group/status", blocking=True))
return True
# helper fn for _check_connection_and_readiness
def _check_ros_controllers_ready(self):
"""
Function to check if ros controllers are running
"""
rospy.logdebug(rostopic.get_topic_type("/ned2/niryo_robot_follow_joint_trajectory_controller/state", blocking=True))
return True
def _check_camera_ready(self):
"""
Function to check if kinect sensor is running
"""
rospy.logdebug(rostopic.get_topic_type("/head_mount_kinect2/depth/points", blocking=True))
return True
def _check_connection_and_readiness(self):
"""
Function to check the connection status of subscribers, publishers and services, as well as the readiness of
all systems.
"""
self._check_moveit_ready()
self._check_joint_states_ready()
self._check_ros_controllers_ready()
if self.use_camera:
self._check_camera_ready()
rospy.loginfo("All system are ready!")
return True