#! /usr/bin/env python
"""
Common functions for handling the physics of the Gazebo simulator.
Functions provided:
- ``get_gazebo_physics_properties`` — get the current physics properties.
- ``set_gazebo_physics_properties`` — set the physics properties.
- ``get_gazebo_max_update_rate`` — get the current maximum update rate.
- ``set_gazebo_max_update_rate`` — set the maximum update rate as a
real-time factor.
- ``get_gazebo_time_step`` — get the current time step.
- ``set_gazebo_time_step`` — set the time step.
- ``get_gazebo_gravity`` — get the current gravity vector.
- ``set_gazebo_gravity`` — set the gravity vector.
- ``get_gazebo_ode_physics`` — get the current ODE physics properties.
- ``set_gazebo_ode_physics`` — set the ODE physics properties.
ROS service references:
- http://docs.ros.org/en/diamondback/api/gazebo/html/srv/GetPhysicsProperties.html
- http://docs.ros.org/en/diamondback/api/gazebo/html/srv/SetPhysicsProperties.html
- http://docs.ros.org/en/diamondback/api/gazebo/html/msg/ODEPhysics.html
"""
import rospy
from gazebo_msgs.srv import GetPhysicsProperties, SetPhysicsProperties, SetPhysicsPropertiesRequest
from gazebo_msgs.msg import ODEPhysics
from multiros.utils import ros_common
from typing import Any, List, Optional, Tuple
[docs]
def get_gazebo_physics_properties(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None):
"""
Function to get the current physics properties of Gazebo.
Args:
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
physics_properties: The current physics properties of Gazebo.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Wait for the 'get_physics_properties' service to be available
try:
rospy.wait_for_service('/gazebo/get_physics_properties', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/get_physics_properties': {e}")
return None
try:
# Create a service proxy for the 'get_physics_properties' service
get_physics_properties = rospy.ServiceProxy('/gazebo/get_physics_properties', GetPhysicsProperties)
# Call the 'get_physics_properties' service and get the current physics properties of Gazebo
physics_properties = get_physics_properties()
return physics_properties
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return None
[docs]
def set_gazebo_physics_properties(time_step: Optional[float] = None,
max_update_rate: Optional[float] = None,
gravity: Optional[List[float]] = None,
ode_config: Optional[ODEPhysics] = None,
ros_port: Optional[str] = None,
gazebo_port: Optional[str] = None) -> bool:
"""
Function to set the physics properties of Gazebo.
Args:
time_step (float): The desired time step for Gazebo (optional).
max_update_rate (float): The desired maximum update rate for Gazebo (optional).
gravity (list): The desired gravity vector for Gazebo (optional).
ode_config (ODEPhysics): The desired ODE physics properties for Gazebo (optional).
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the service call was successful, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
# Create a SetPhysicsPropertiesRequest object
set_physics_properties_request = SetPhysicsPropertiesRequest()
set_physics_properties_request.time_step = time_step if time_step is not None else physics_properties.time_step
set_physics_properties_request.max_update_rate = max_update_rate if max_update_rate is not None else physics_properties.max_update_rate
set_physics_properties_request.gravity = gravity if gravity is not None else physics_properties.gravity
set_physics_properties_request.ode_config = ode_config if ode_config is not None else physics_properties.ode_config
# Wait for the 'set_physics_properties' service to be available
try:
rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}")
return False
try:
# Create a service proxy for the 'set_physics_properties' service
set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties)
# Call the 'set_physics_properties' service and set the physics properties for Gazebo
set_physics_properties(set_physics_properties_request)
return True
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return False
[docs]
def get_gazebo_max_update_rate(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> float:
"""
Function to get the current maximum update rate for Gazebo.
Args:
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
float: The current maximum update rate for Gazebo.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
# Return the current maximum update rate for Gazebo
return physics_properties.max_update_rate
[docs]
def set_gazebo_max_update_rate(real_time_factor: float, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool:
"""
Function to set the maximum update rate for Gazebo in real-time factor.
1 is real time, n is n times the rate of real time.
Args:
real_time_factor (float): The desired real-time factor for Gazebo.
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the service call was successful, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
# Calculate the maximum update rate for Gazebo in real-time factor
max_update_rate = ((1.0 / physics_properties.time_step) * real_time_factor)
# Create a SetPhysicsPropertiesRequest object
set_physics_properties_request = SetPhysicsPropertiesRequest()
set_physics_properties_request.time_step = physics_properties.time_step
set_physics_properties_request.max_update_rate = max_update_rate
set_physics_properties_request.gravity = physics_properties.gravity
set_physics_properties_request.ode_config = physics_properties.ode_config
# Wait for the 'set_physics_properties' service to be available
try:
rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}")
return False
try:
# Create a service proxy for the 'set_physics_properties' service
set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties)
# Call the 'set_physics_properties' service and set the maximum update rate for Gazebo in real-time factor
set_physics_properties(set_physics_properties_request)
return True
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return False
[docs]
def get_gazebo_time_step(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> float:
"""
Function to get the current time step for Gazebo.
Args:
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
float: The current time step for Gazebo.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
# Return the current time step for Gazebo
return physics_properties.time_step
[docs]
def set_gazebo_time_step(time_step: float, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool:
"""
Function to set the time step for Gazebo.
Args:
time_step (float): The desired time step for Gazebo.
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the service call was successful, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
# Create a SetPhysicsPropertiesRequest object
set_physics_properties_request = SetPhysicsPropertiesRequest()
set_physics_properties_request.time_step = time_step
set_physics_properties_request.max_update_rate = physics_properties.max_update_rate
set_physics_properties_request.gravity = physics_properties.gravity
set_physics_properties_request.ode_config = physics_properties.ode_config
# Wait for the 'set_physics_properties' service to be available
try:
rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}")
return False
try:
# Create a service proxy for the 'set_physics_properties' service
set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties)
# Call the 'set_physics_properties' service and set the time step for Gazebo
set_physics_properties(set_physics_properties_request)
return True
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return False
[docs]
def get_gazebo_gravity(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> List[float]:
"""
Function to get the current gravity vector for Gazebo.
Args:
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
list[float]: The current gravity vector for Gazebo.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
# Return the current gravity vector for Gazebo
return physics_properties.gravity
[docs]
def set_gazebo_gravity(gravity: List[float], ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool:
"""
Function to set the gravity vector for Gazebo.
Args:
gravity (List[float]): The desired gravity vector for Gazebo.
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the service call was successful, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
# Create a SetPhysicsPropertiesRequest object
set_physics_properties_request = SetPhysicsPropertiesRequest()
set_physics_properties_request.time_step = physics_properties.time_step
set_physics_properties_request.max_update_rate = physics_properties.max_update_rate
set_physics_properties_request.gravity = gravity
set_physics_properties_request.ode_config = physics_properties.ode_config
# Wait for the 'set_physics_properties' service to be available
try:
rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}")
return False
try:
# Create a service proxy for the 'set_physics_properties' service
set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties)
# Call the 'set_physics_properties' service and set the gravity vector for Gazebo
set_physics_properties(set_physics_properties_request)
return True
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return False
[docs]
def get_gazebo_ode_physics(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None):
"""
Function to get the current ODE physics properties of Gazebo.
Args:
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
ode_config: The current ODE physics properties of Gazebo.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
return physics_properties.ode_config
[docs]
def set_gazebo_ode_physics(ode_config: ODEPhysics, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool:
"""
Function to set the ODE physics properties of Gazebo.
Args:
ode_config (ODEPhysics): The desired ODE physics properties for Gazebo.
ros_port (str): The ROS_MASTER_URI port (optional).
gazebo_port (str): The GAZEBO_MASTER_URI port (optional).
Returns:
bool: True if the service call was successful, False otherwise.
"""
# Change the ROS and Gazebo master environment variables if provided
if ros_port is not None:
ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port)
# Get the current physics properties of Gazebo
physics_properties = get_gazebo_physics_properties()
# Create a SetPhysicsPropertiesRequest object
set_physics_properties_request = SetPhysicsPropertiesRequest()
set_physics_properties_request.time_step = physics_properties.time_step
set_physics_properties_request.max_update_rate = physics_properties.max_update_rate
set_physics_properties_request.gravity = physics_properties.gravity
set_physics_properties_request.ode_config = ode_config
# Wait for the 'set_physics_properties' service to be available
try:
rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0)
except rospy.ROSException as e:
rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}")
return False
try:
# Create a service proxy for the 'set_physics_properties' service
set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties)
# Call the 'set_physics_properties' service and set the ODE physics properties for Gazebo
set_physics_properties(set_physics_properties_request)
return True
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return False