Source code for multiros.utils.gazebo_physics

#! /usr/bin/env python

"""
Common functions for handling the physics of the Gazebo simulator.

Functions provided:

- ``get_gazebo_physics_properties`` — get the current physics properties.
- ``set_gazebo_physics_properties`` — set the physics properties.
- ``get_gazebo_max_update_rate`` — get the current maximum update rate.
- ``set_gazebo_max_update_rate`` — set the maximum update rate as a
  real-time factor.
- ``get_gazebo_time_step`` — get the current time step.
- ``set_gazebo_time_step`` — set the time step.
- ``get_gazebo_gravity`` — get the current gravity vector.
- ``set_gazebo_gravity`` — set the gravity vector.
- ``get_gazebo_ode_physics`` — get the current ODE physics properties.
- ``set_gazebo_ode_physics`` — set the ODE physics properties.

ROS service references:

- http://docs.ros.org/en/diamondback/api/gazebo/html/srv/GetPhysicsProperties.html
- http://docs.ros.org/en/diamondback/api/gazebo/html/srv/SetPhysicsProperties.html
- http://docs.ros.org/en/diamondback/api/gazebo/html/msg/ODEPhysics.html
"""

import rospy
from gazebo_msgs.srv import GetPhysicsProperties, SetPhysicsProperties, SetPhysicsPropertiesRequest
from gazebo_msgs.msg import ODEPhysics
from multiros.utils import ros_common
from typing import Any, List, Optional, Tuple


[docs] def get_gazebo_physics_properties(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None): """ Function to get the current physics properties of Gazebo. Args: ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: physics_properties: The current physics properties of Gazebo. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Wait for the 'get_physics_properties' service to be available try: rospy.wait_for_service('/gazebo/get_physics_properties', timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/get_physics_properties': {e}") return None try: # Create a service proxy for the 'get_physics_properties' service get_physics_properties = rospy.ServiceProxy('/gazebo/get_physics_properties', GetPhysicsProperties) # Call the 'get_physics_properties' service and get the current physics properties of Gazebo physics_properties = get_physics_properties() return physics_properties except rospy.ServiceException as e: rospy.logerr(f"Service call failed: {e}") return None
[docs] def set_gazebo_physics_properties(time_step: Optional[float] = None, max_update_rate: Optional[float] = None, gravity: Optional[List[float]] = None, ode_config: Optional[ODEPhysics] = None, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool: """ Function to set the physics properties of Gazebo. Args: time_step (float): The desired time step for Gazebo (optional). max_update_rate (float): The desired maximum update rate for Gazebo (optional). gravity (list): The desired gravity vector for Gazebo (optional). ode_config (ODEPhysics): The desired ODE physics properties for Gazebo (optional). ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: bool: True if the service call was successful, False otherwise. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() # Create a SetPhysicsPropertiesRequest object set_physics_properties_request = SetPhysicsPropertiesRequest() set_physics_properties_request.time_step = time_step if time_step is not None else physics_properties.time_step set_physics_properties_request.max_update_rate = max_update_rate if max_update_rate is not None else physics_properties.max_update_rate set_physics_properties_request.gravity = gravity if gravity is not None else physics_properties.gravity set_physics_properties_request.ode_config = ode_config if ode_config is not None else physics_properties.ode_config # Wait for the 'set_physics_properties' service to be available try: rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}") return False try: # Create a service proxy for the 'set_physics_properties' service set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties) # Call the 'set_physics_properties' service and set the physics properties for Gazebo set_physics_properties(set_physics_properties_request) return True except rospy.ServiceException as e: rospy.logerr(f"Service call failed: {e}") return False
[docs] def get_gazebo_max_update_rate(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> float: """ Function to get the current maximum update rate for Gazebo. Args: ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: float: The current maximum update rate for Gazebo. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() # Return the current maximum update rate for Gazebo return physics_properties.max_update_rate
[docs] def set_gazebo_max_update_rate(real_time_factor: float, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool: """ Function to set the maximum update rate for Gazebo in real-time factor. 1 is real time, n is n times the rate of real time. Args: real_time_factor (float): The desired real-time factor for Gazebo. ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: bool: True if the service call was successful, False otherwise. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() # Calculate the maximum update rate for Gazebo in real-time factor max_update_rate = ((1.0 / physics_properties.time_step) * real_time_factor) # Create a SetPhysicsPropertiesRequest object set_physics_properties_request = SetPhysicsPropertiesRequest() set_physics_properties_request.time_step = physics_properties.time_step set_physics_properties_request.max_update_rate = max_update_rate set_physics_properties_request.gravity = physics_properties.gravity set_physics_properties_request.ode_config = physics_properties.ode_config # Wait for the 'set_physics_properties' service to be available try: rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}") return False try: # Create a service proxy for the 'set_physics_properties' service set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties) # Call the 'set_physics_properties' service and set the maximum update rate for Gazebo in real-time factor set_physics_properties(set_physics_properties_request) return True except rospy.ServiceException as e: rospy.logerr(f"Service call failed: {e}") return False
[docs] def get_gazebo_time_step(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> float: """ Function to get the current time step for Gazebo. Args: ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: float: The current time step for Gazebo. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() # Return the current time step for Gazebo return physics_properties.time_step
[docs] def set_gazebo_time_step(time_step: float, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool: """ Function to set the time step for Gazebo. Args: time_step (float): The desired time step for Gazebo. ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: bool: True if the service call was successful, False otherwise. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() # Create a SetPhysicsPropertiesRequest object set_physics_properties_request = SetPhysicsPropertiesRequest() set_physics_properties_request.time_step = time_step set_physics_properties_request.max_update_rate = physics_properties.max_update_rate set_physics_properties_request.gravity = physics_properties.gravity set_physics_properties_request.ode_config = physics_properties.ode_config # Wait for the 'set_physics_properties' service to be available try: rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}") return False try: # Create a service proxy for the 'set_physics_properties' service set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties) # Call the 'set_physics_properties' service and set the time step for Gazebo set_physics_properties(set_physics_properties_request) return True except rospy.ServiceException as e: rospy.logerr(f"Service call failed: {e}") return False
[docs] def get_gazebo_gravity(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> List[float]: """ Function to get the current gravity vector for Gazebo. Args: ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: list[float]: The current gravity vector for Gazebo. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() # Return the current gravity vector for Gazebo return physics_properties.gravity
[docs] def set_gazebo_gravity(gravity: List[float], ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool: """ Function to set the gravity vector for Gazebo. Args: gravity (List[float]): The desired gravity vector for Gazebo. ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: bool: True if the service call was successful, False otherwise. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() # Create a SetPhysicsPropertiesRequest object set_physics_properties_request = SetPhysicsPropertiesRequest() set_physics_properties_request.time_step = physics_properties.time_step set_physics_properties_request.max_update_rate = physics_properties.max_update_rate set_physics_properties_request.gravity = gravity set_physics_properties_request.ode_config = physics_properties.ode_config # Wait for the 'set_physics_properties' service to be available try: rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}") return False try: # Create a service proxy for the 'set_physics_properties' service set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties) # Call the 'set_physics_properties' service and set the gravity vector for Gazebo set_physics_properties(set_physics_properties_request) return True except rospy.ServiceException as e: rospy.logerr(f"Service call failed: {e}") return False
[docs] def get_gazebo_ode_physics(ros_port: Optional[str] = None, gazebo_port: Optional[str] = None): """ Function to get the current ODE physics properties of Gazebo. Args: ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: ode_config: The current ODE physics properties of Gazebo. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() return physics_properties.ode_config
[docs] def set_gazebo_ode_physics(ode_config: ODEPhysics, ros_port: Optional[str] = None, gazebo_port: Optional[str] = None) -> bool: """ Function to set the ODE physics properties of Gazebo. Args: ode_config (ODEPhysics): The desired ODE physics properties for Gazebo. ros_port (str): The ROS_MASTER_URI port (optional). gazebo_port (str): The GAZEBO_MASTER_URI port (optional). Returns: bool: True if the service call was successful, False otherwise. """ # Change the ROS and Gazebo master environment variables if provided if ros_port is not None: ros_common.change_ros_gazebo_master(ros_port=ros_port, gazebo_port=gazebo_port) # Get the current physics properties of Gazebo physics_properties = get_gazebo_physics_properties() # Create a SetPhysicsPropertiesRequest object set_physics_properties_request = SetPhysicsPropertiesRequest() set_physics_properties_request.time_step = physics_properties.time_step set_physics_properties_request.max_update_rate = physics_properties.max_update_rate set_physics_properties_request.gravity = physics_properties.gravity set_physics_properties_request.ode_config = ode_config # Wait for the 'set_physics_properties' service to be available try: rospy.wait_for_service('/gazebo/set_physics_properties', timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '/gazebo/set_physics_properties': {e}") return False try: # Create a service proxy for the 'set_physics_properties' service set_physics_properties = rospy.ServiceProxy('/gazebo/set_physics_properties', SetPhysicsProperties) # Call the 'set_physics_properties' service and set the ODE physics properties for Gazebo set_physics_properties(set_physics_properties_request) return True except rospy.ServiceException as e: rospy.logerr(f"Service call failed: {e}") return False