#! /usr/bin/env python
"""
THE GAZEBO BRIDGE SCRIPT PROVIDES ALL THE MAIN FUNCTIONS FOR ROS TO COMMUNICATE WITH GAZEBO.
MOST OF THE ROS TOPICS & SERVICES FOR GAZEBO CAN BE FOUND HERE
http://wiki.ros.org/gazebo
AND HERE
https://classic.gazebosim.org/tutorials?tut=ros_comm&cat=connect_ros
WITH THE SCRIPT HAVE THE FOLLOWING FUNCTIONALITY,
1. launch_gazebo: Launch Gazebo using the ROS
2. close_gazebo: Close a gazebo instance. This function is to close both gzclient and the gzserver
3. reset_gazebo: Reset the Gazebo simulation or world.
4. pause_gazebo: Pause the Gazebo simulation.
5. unpause_gazebo: Unpause the Gazebo simulation.
6. gazebo_step: Step gazebo simulation multiple iteration.
"""
import rospy
import rospkg
import os
import subprocess
import time
from std_srvs.srv import Empty
from typing import List, Optional, Tuple
from multiros.utils import ros_common
"""
1. launch_gazebo: Launch Gazebo using the ROS
"""
[docs]
def launch_gazebo(launch_roscore: bool = True,
port: Optional[int] = None,
paused: bool = False,
use_sim_time: bool = True,
extra_gazebo_args: Optional[str] = None,
gui: bool = False,
recording: bool = False,
debug: bool = False,
physics: str = "ode",
verbose: bool = False,
output: str = 'screen',
respawn_gazebo: bool = False,
pub_clock_frequency: int = 100,
server_required: bool = False,
gui_required: bool = False,
custom_world_path: Optional[str] = None,
custom_world_pkg: Optional[str] = None,
custom_world_name: Optional[str] = None,
launch_new_term: bool = True) -> Tuple[str, str, subprocess.Popen]:
"""
Launch Gazebo using the ROS.
All the available options when launching gazebo with ros can be found in the
/opt/ros/noetic/share/gazebo_ros/launch/empty_world.launch
https://classic.gazebosim.org/tutorials?tut=ros_roslaunch&cat=connect_ros
Args:
launch_roscore (bool): if True, launch the roscore together with gazebo. Defaults to True.
port (str): if the "launch_roscore" is "True", it launches using the the the given port.
paused (bool): Whether to start Gazebo in a paused state. Defaults to False.
use_sim_time (bool): Gazebo-published simulation time, published over the ROS topic /clock (default true)
extra_gazebo_args (str): Use this argument to add extra options to Gazebo
gui (bool): Launch the user interface window of Gazebo (default true) (gzclient).
recording (bool): if True, record the data from gazebo.
debug (bool): Start gzserver (Gazebo Server) in debug mode using gdb (default false)
physics (str): Set the physics engine (ode, dart, bullet, simbody). Currently, have support for "ode" only.
verbose (bool): Printing errors and warnings to the terminal (default false)
output (str): The output method for Gazebo (screen or log). Defaults to 'screen'.
respawn_gazebo (bool): Whether to respawn Gazebo if it is killed. Defaults to False.
pub_clock_frequency (int): The frequency of the clock publisher (in Hz). Defaults to 100.
server_required (bool): Terminate launch script when gzserver (Gazebo Server) exits (default false)
gui_required (bool): Terminate launch script when gzclient (user interface window) exits (default false)
custom_world_path (str): Absolute path to the custom world file. (optional)
custom_world_pkg (str): if the custom_world_path is None, package of the custom world file (optional)
custom_world_name (str): if the custom_world_pkg is not None, name of the custom world file (optional)
launch_new_term (bool): Launch the gazebo node in a new terminal (Xterm).
Returns:
Tuple: A tuple containing the ROS port, the Gazebo port, and the process object for the launched Gazebo instance.
"""
rospack = rospkg.RosPack()
try:
rospack.get_path('gazebo_ros')
except rospkg.common.ResourceNotFound:
rospy.logerr("The package gazebo_ros was not found!")
return None, None, None
# Term command to start gazebo
term_cmd = "roslaunch gazebo_ros empty_world.launch "
# Start Gazebo in a paused
term_cmd += " paused:=" + str(paused)
# Tells ROS nodes asking for time to get the Gazebo-published simulation time, published over the ROS topic /clock
term_cmd += " use_sim_time:=" + str(use_sim_time)
# Add extra options to gazebo
if extra_gazebo_args is not None:
term_cmd += " extra_gazebo_args:=" + extra_gazebo_args
# Launch gzclient
term_cmd += " gui:=" + str(gui)
# Record the data from gazebo
term_cmd += " recording:=" + str(recording)
# Start gzserver (Gazebo Server) in debug mode using gdb
term_cmd += " debug:=" + str(debug)
# change the physics engine. Currently, the gazebo_ros supports only "ode"
if physics != "ode":
term_cmd += " physics:=" + physics
# if True, print debug information.
term_cmd += " verbose:=" + str(verbose)
# choose the output method for gazebo (screen, log).
term_cmd += " output:=" + str(output)
# if True, gazebo will be respawned if it is killed.
term_cmd += " respawn_gazebo:=" + str(respawn_gazebo)
# the frequency of the clock publisher (in Hz)
term_cmd += " pub_clock_frequency:=" + str(pub_clock_frequency)
# if True, the launch file will wait until gzserver is running.
term_cmd += " server_required:=" + str(server_required)
# if True, the launch file will wait until gzclient is running.
term_cmd += " gui_required:=" + str(gui_required)
# Select world
if custom_world_path is not None:
if os.path.exists(custom_world_path) is False:
rospy.logerr("Custom World file in " + custom_world_path + " does not exist!")
return None, None, None
term_cmd += " world_name:=" + str(custom_world_path)
elif custom_world_pkg and custom_world_name is not None:
try:
world_pkg_path = rospack.get_path(custom_world_pkg)
except rospkg.common.ResourceNotFound:
rospy.logwarn("Package where world file is located was NOT FOUND!")
return None, None, None
world_file_path = world_pkg_path + "/worlds/" + custom_world_name
if os.path.exists(world_file_path) is False:
rospy.logerr("Custom World file in " + world_file_path + " does not exist!")
return None, None, None
term_cmd += " world_name:=" + str(world_file_path)
# initializing the var for "launch_roscore"
ros_port = None
gazebo_port = None
if launch_roscore:
if port is not None:
ros_port, gazebo_port = ros_common.launch_roscore(port=int(port))
else:
ros_port, gazebo_port = ros_common.launch_roscore()
# Snapshot existing gzserver/gzclient PIDs so we can identify the
# ones THIS launch creates (and only kill those on Ctrl+C).
pre_gazebo_pids = _gazebo_pids()
# Launch gazebo
if launch_new_term:
term_cmd = f"xterm -e '{term_cmd}'"
# print("term_cmd:", term_cmd)
process = subprocess.Popen(term_cmd, shell=True)
else:
# print("term_cmd:", term_cmd)
process = subprocess.Popen(term_cmd, shell=True)
time.sleep(5.0)
# waiting for gazebo to done launching
try:
rospy.wait_for_service('/gazebo/pause_physics', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/pause_physics' "
f"after launching Gazebo on port {gazebo_port}: {e}")
return None, None, None
# Identify gzserver/gzclient PIDs spawned by THIS launch (set diff
# vs. the snapshot taken before Popen). Register them with the
# managed-process registry so SIGINT / atexit can clean them up.
new_gazebo_pids = sorted(_gazebo_pids() - pre_gazebo_pids)
ros_common.register_managed_process(
process,
gazebo_pids=new_gazebo_pids,
gazebo_port=gazebo_port,
kind="gazebo",
)
# if launch_roscore in False, ignore the first two returns
return ros_port, gazebo_port, process
def _gazebo_pids() -> set:
"""Return PIDs of currently running gzserver / gzclient processes."""
pids: set = set()
for name in ("gzserver", "gzclient"):
try:
out = subprocess.run(
["pgrep", "-x", name],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
timeout=5,
)
pids.update(int(p) for p in out.stdout.decode().split() if p)
except Exception:
pass
return pids
"""
2. close_gazebo: close a gazebo instance. This function is to close both gzclient and the gzserver
"""
[docs]
def close_gazebo(process: subprocess.Popen, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool:
"""
Function to close a gazebo instance. This function is to close both gzclient and the gzserver
Args:
process: A subprocess.Popen object representing the running Gazebo instance.
ros_port (str): The ROS port used by the Gazebo instance. Defaults to None. (optional).
gazebo_port (str): The Gazebo port used by the Gazebo instance. Defaults to None. (optional).
Returns:
bool: True if Gazebo was closed successfully, False otherwise.
"""
# change the rosmaster
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
process.terminate()
rospy.logdebug("Closing Gazebo!")
return True
"""
3. reset_gazebo: Reset the Gazebo simulation or world.
"""
[docs]
def reset_gazebo(reset_type: str = "simulation", max_tries: int = 5, ros_port: Optional[str] = None,
gazebo_port: Optional[str] = None) -> bool:
"""
Function to reset the Gazebo simulation or world.
Args:
reset_type (str): The type of reset to perform ("simulation" or "world") (optional).
max_tries (int): The maximum number of tries to reset the simulation or world (optional).
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the service call was successful, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Determine the service name based on the reset_type
service_name = "/gazebo/reset_simulation" if reset_type == "simulation" else "/gazebo/reset_world"
# Wait for the service to be available
try:
rospy.wait_for_service(service_name, timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '{service_name}': {e}")
return False
# Try to reset the simulation or world up to max_tries times
for i in range(max_tries):
try:
# Create a service proxy for the service
reset_service = rospy.ServiceProxy(service_name, Empty)
# Call the service to reset the simulation or world
reset_service()
rospy.loginfo(f"Reset {reset_type} successful!")
return True
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return False
"""
4. pause_gazebo: Pause the Gazebo simulation.
"""
[docs]
def pause_gazebo(max_tries: int = 5, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool:
"""
Function to pause the Gazebo simulation.
Args:
max_tries (int): The maximum number of tries to pause the simulation (optional).
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the service call was successful, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Wait for the 'pause_physics' service to be available
try:
rospy.wait_for_service('/gazebo/pause_physics', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/pause_physics': {e}")
return False
# Try to pause the simulation up to max_tries times
for i in range(max_tries):
try:
# Create a service proxy for the 'pause_physics' service
pause_physics = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
# Call the 'pause_physics' service to pause the simulation
pause_physics()
rospy.logdebug(f"Pause successful!")
return True
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return False
"""
5. unpause_gazebo: Unpause the Gazebo simulation.
"""
[docs]
def unpause_gazebo(max_tries: int = 5, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool:
"""
Function to unpause the Gazebo simulation.
Args:
max_tries (int): The maximum number of tries to unpause the simulation (optional).
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the service call was successful, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Wait for the 'unpause_physics' service to be available
try:
rospy.wait_for_service('/gazebo/unpause_physics', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/unpause_physics': {e}")
return False
# Try to unpause the simulation up to max_tries times
for i in range(max_tries):
try:
# Create a service proxy for the 'unpause_physics' service
unpause_physics = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
# Call the 'unpause_physics' service to unpause the simulation
unpause_physics()
rospy.logdebug(f"Unpause successful!")
return True
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return False
[docs]
def gazebo_step(steps: int, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool:
"""
Function to Step gazebo simulation multiple iteration.
https://manpages.ubuntu.com/manpages/focal/man1/gz.1.html
Args:
steps (int): The number of steps to advance the simulation.
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the task is completed successfully, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
try:
term_cmd = "gz world -m " + str(steps)
subprocess.Popen(term_cmd, shell=True).wait()
return True
except Exception as e:
rospy.logerr(f"An error occurred while stepping the physics in Gazebo: {e}")
return False