Source code for uniros.utils.ros_controllers

#! /usr/bin/env python

"""
Functions for handling ROS controllers via the ``controller_manager``
services.

Functions provided:

- ``load_ros_controller`` — load a ROS controller.
- ``load_controller_list`` — load a list of controllers.
- ``list_loaded_controllers`` — list all loaded controllers.
- ``unload_ros_controller`` — unload a controller.
- ``unload_controller_list`` — unload a list of controllers.
- ``switch_controllers`` — switch between two sets of controllers.
- ``start_controllers`` — start a list of controllers.
- ``stop_controllers`` — stop a list of controllers.
- ``reset_controllers`` — stop then re-start a list of controllers.
- ``spawn_controllers`` — load then start a list of controllers.
- ``unspawn_controllers`` — stop then unload a list of controllers.

Every helper wraps its underlying service call with a 30-second
``rospy.wait_for_service`` timeout so a hung ``controller_manager``
surfaces as a logged failure rather than an indefinite hang.

ROS service reference:
http://docs.ros.org/en/noetic/api/controller_manager_msgs/html/index-msg.html
"""

from typing import List, Optional

import rospy
from controller_manager_msgs.srv import *

"""
    01. load_ros_controller: To load a ROS controller.
"""


[docs] def load_ros_controller(controller_name: str, ns: Optional[str] = None, max_retries: int = 5) -> bool: """ Function to load a ROS controller using the controller_manager_msgs/LoadController service. (Need to have this controller loaded in the parameter server) Args: controller_name (str): The name of the controller to load. ns (str): The namespace of the controller_manager service (optional). max_retries (int): The maximum number of times to retry loading the controller (optional). Returns: bool: True if the controller was successfully loaded, False otherwise. """ # Create a service proxy for the LoadController service if ns is not None: # Remove any trailing slashes from ns ns = ns.rstrip('/') load_controller_service = f'{ns}/controller_manager/load_controller' else: load_controller_service = '/controller_manager/load_controller' # wait for service to be available try: rospy.wait_for_service(load_controller_service, timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '{load_controller_service}': {e}") return False # create proxy to the service load_controller = rospy.ServiceProxy(load_controller_service, LoadController) # Attempt to load the controller for i in range(max_retries): try: response = load_controller(name=controller_name) if response.ok: return True except rospy.ServiceException as e: rospy.logerr(f'Try {i}: Failed to call service {load_controller_service}: {e}') # Failed to load the controller after max_retries attempts return False
""" 02. load_controller_list: To load a list of ROS controllers. """
[docs] def load_controller_list(controller_list: List[str], ns: Optional[str] = None, max_retries: int = 5) -> bool: """ Function to load a list of ROS controllers. Args: controller_list (list): A list of controller names to load. ns (str): The namespace of the controller_manager service (optional). max_retries (int): The maximum number of times to retry loading each controller (optional). Returns: bool: True if all controllers were successfully loaded, False otherwise. """ # Attempt to load each controller in the list for controller_name in controller_list: if not load_ros_controller(controller_name, ns, max_retries): return False # All controllers were successfully loaded return True
""" 03. list_loaded_controllers: To list all loaded ROS controllers. """
[docs] def list_loaded_controllers(ns: Optional[str] = None, max_retries: int = 5) -> List[str]: """ Function to list all loaded ROS controllers using the controller_manager_msgs/ListControllers service. Args: ns (str): The namespace of the controller_manager service (optional). max_retries (int): The maximum number of times to retry calling the service (optional). Returns: list: A list of loaded controller names. If an error occurred while calling the service, an empty list is returned. """ # Create a service proxy for the ListControllers service if ns is not None: # Remove any trailing slashes from ns ns = ns.rstrip('/') list_controllers_service = f'{ns}/controller_manager/list_controllers' else: list_controllers_service = '/controller_manager/list_controllers' # wait for service to be available try: rospy.wait_for_service(list_controllers_service, timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '{list_controllers_service}': {e}") return [] # create proxy to the service list_controllers = rospy.ServiceProxy(list_controllers_service, ListControllers) # Attempt to call the ListControllers service for i in range(max_retries): try: response = list_controllers() return [controller.name for controller in response.controller] except rospy.ServiceException as e: rospy.logerr(f'Try {i}: Failed to call service {list_controllers_service}: {e}') # Failed to call the ListControllers service after max_retries attempts return []
""" 04. unload_ros_controller: To unload a ROS controller. """
[docs] def unload_ros_controller(controller_name: str, ns: Optional[str] = None, max_retries: int = 5) -> bool: """ Function to unload a ROS controller using the controller_manager_msgs/UnloadController service. Args: controller_name (str): The name of the controller to unload. ns (str): The namespace of the controller_manager service (optional). max_retries (int): The maximum number of times to retry unloading the controller (optional). Returns: bool: True if the controller was successfully unloaded, False otherwise. """ # Create a service proxy for the UnloadController service if ns is not None: # Remove any trailing slashes from ns ns = ns.rstrip('/') unload_controller_service = f'{ns}/controller_manager/unload_controller' else: unload_controller_service = '/controller_manager/unload_controller' # wait for service to be available try: rospy.wait_for_service(unload_controller_service, timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '{unload_controller_service}': {e}") return False # create proxy to the service unload_controller = rospy.ServiceProxy(unload_controller_service, UnloadController) # Attempt to unload the controller for i in range(max_retries): try: response = unload_controller(name=controller_name) if response.ok: return True except rospy.ServiceException as e: rospy.logerr(f'Try {i}: Failed to call service {unload_controller_service}: {e}') # Failed to unload the controller after max_retries attempts return False
""" 05. unload_controller_list: To unload a list of ROS controllers. """
[docs] def unload_controller_list(controller_list: List[str], ns: Optional[str] = None, max_retries: int = 5) -> bool: """ Function to unload a list of ROS controllers using the unload_ros_controller function. Args: controller_list (list): A list of controller names to unload. ns (str): The namespace of the controller_manager service (optional). max_retries (int): The maximum number of times to retry unloading each controller (optional). Returns: bool: True if all controllers were successfully unloaded, False otherwise. """ # Attempt to unload each controller in the list for controller_name in controller_list: if not unload_ros_controller(controller_name, ns, max_retries): return False # All controllers were successfully unloaded return True
""" 06. switch_controllers: To switch between two sets of ROS controllers. """
[docs] def switch_controllers(start_controllers_list: List[str], stop_controllers_list: List[str], ns: Optional[str] = None, strictness: int = 1, start_asap: bool = False, timeout: float = 0.0, max_retries: int = 5) -> bool: """ Function to switch between two sets of ROS controllers using the controller_manager_msgs/SwitchController service. Args: start_controllers_list (list): A list of controller names to start. stop_controllers_list (list): A list of controller names to stop. ns (str): The namespace of the controller_manager service (optional). strictness (int): The level of strictness to use when switching controllers (optional). 1 = BEST_EFFORT, 2 = STRICT. start_asap (bool): Whether to start controllers in the start_controllers list as soon as possible (optional). timeout (float): The maximum amount of time to wait for the controller switch to complete (optional). max_retries (int): The maximum number of times to retry switching controllers (optional). Returns: bool: True if the controllers were successfully switched, False otherwise. """ # Create a service proxy for the SwitchController service if ns is not None: # Remove any trailing slashes from ns ns = ns.rstrip('/') switch_controller_service = f'{ns}/controller_manager/switch_controller' else: switch_controller_service = '/controller_manager/switch_controller' try: rospy.wait_for_service(switch_controller_service, timeout=30.0) except rospy.ROSException as e: rospy.logerr(f"Timeout (30s) waiting for service '{switch_controller_service}': {e}") return False switch_controller = rospy.ServiceProxy(switch_controller_service, SwitchController) # Attempt to switch controllers for i in range(max_retries): try: response = switch_controller(start_controllers=start_controllers_list, stop_controllers=stop_controllers_list, strictness=strictness, start_asap=start_asap, timeout=timeout) if response.ok: return True except rospy.ServiceException as e: rospy.logerr(f'ry {i}: Failed to call service {switch_controller_service}: {e}') # Failed to switch controllers after max_retries attempts return False
""" 07. start_controllers: To start a list of ROS controllers. """
[docs] def start_controllers(controller_list: List[str], ns: Optional[str] = None, strictness: int = 1, start_asap: bool = False, timeout: float = 0.0, max_retries: int = 5) -> bool: """ Function to start a list of ROS controllers using the switch_controllers function. Args: controller_list (list): A list of controller names to start. ns (str): The namespace of the controller_manager service (optional). strictness (int): The level of strictness to use when starting controllers (optional). 1 = BEST_EFFORT, 2 = STRICT. start_asap (bool): Whether to start controllers in the controller_list as soon as possible (optional). timeout (float): The maximum amount of time to wait for the controllers to start (optional). max_retries (int): The maximum number of times to retry starting controllers (optional). Returns: bool: True if all controllers were successfully started, False otherwise. """ return switch_controllers(controller_list, [], ns, strictness, start_asap, timeout, max_retries)
""" 08. stop_controllers: To stop a list of ROS controllers. """
[docs] def stop_controllers(controller_list: List[str], ns: Optional[str] = None, strictness: int = 1, start_asap: bool = False, timeout: float = 0.0, max_retries: int = 5) -> bool: """ Function to stop a list of ROS controllers using the switch_controllers function. Args: controller_list (list): A list of controller names to stop. ns (str): The namespace of the controller_manager service (optional). strictness (int): The level of strictness to use when stopping controllers (optional). 1 = BEST_EFFORT, 2 = STRICT. start_asap (bool): Whether to stop controllers in the controller_list as soon as possible (optional). timeout (float): The maximum amount of time to wait for the controllers to stop (optional). max_retries (int): The maximum number of times to retry stopping controllers (optional). Returns: bool: True if all controllers were successfully stopped, False otherwise. """ return switch_controllers([], controller_list, ns, strictness, start_asap, timeout, max_retries)
""" 09. reset_controllers: To "reset" a list of ROS controllers by stopping and then starting them again. """
[docs] def reset_controllers(controller_list: List[str], ns: Optional[str] = None, strictness: int = 1, start_asap: bool = False, timeout: float = 0.0, max_retries: int = 5) -> bool: """ Function to "reset" a list of ROS controllers by stopping and then starting them again. Args: controller_list (list): A list of controller names to reset. ns (str): The namespace of the controller_manager service (optional). strictness (int): The level of strictness to use when resetting controllers (optional). 1 = BEST_EFFORT, 2 = STRICT. start_asap (bool): Whether to start controllers in the controller_list as soon as possible (optional). timeout (float): The maximum amount of time to wait for the controllers to reset (optional). max_retries (int): The maximum number of times to retry resetting controllers (optional). Returns: bool: True if all controllers were successfully reset, False otherwise. """ # Stop the controllers if not stop_controllers(controller_list, ns, strictness, start_asap, timeout, max_retries): return False # Start the controllers if not start_controllers(controller_list, ns, strictness, start_asap, timeout, max_retries): return False # All controllers were successfully reset return True
""" 10. spawn_controllers: To spawn a list of ROS controllers by loading and then starting them. """
[docs] def spawn_controllers(controller_list: List[str], ns: Optional[str] = None, strictness: int = 1, start_asap: bool = False, timeout: float = 0.0, max_retries: int = 5) -> bool: """ Function to spawn a list of ROS controllers by loading and then starting them. Args: controller_list (list): A list of controller names to spawn. ns (str): The namespace of the controller_manager service (optional). strictness (int): The level of strictness to use when spawning controllers (optional). 1 = BEST_EFFORT, 2 = STRICT. start_asap (bool): Whether to start controllers in the controller_list as soon as possible (optional). timeout (float): The maximum amount of time to wait for the controllers to spawn (optional). max_retries (int): The maximum number of times to retry spawning controllers (optional). Returns: bool: True if all controllers were successfully spawned, False otherwise. """ # Load the controllers if not load_controller_list(controller_list, ns, max_retries): return False # Start the controllers if not start_controllers(controller_list, ns, strictness, start_asap, timeout, max_retries): return False # All controllers were successfully spawned return True
""" 11. unspawn_controllers: To unspawn a list of ROS controllers by stopping and then unloading them. """
[docs] def unspawn_controllers(controller_list: List[str], ns: Optional[str] = None, strictness: int = 1, start_asap: bool = False, timeout: float = 0.0, max_retries: int = 5) -> bool: """ Function to unspawn a list of ROS controllers by stopping and then unloading them. Args: controller_list (list): A list of controller names to unspawn. ns (str): The namespace of the controller_manager service (optional). strictness (int): The level of strictness to use when unspawning controllers (optional). 1 = BEST_EFFORT, 2 = STRICT. start_asap (bool): Whether to stop controllers in the controller_list as soon as possible (optional). timeout (float): The maximum amount of time to wait for the controllers to unspawn (optional). max_retries (int): The maximum number of times to retry unspawning controllers (optional). Returns: bool: True if all controllers were successfully unspawned, False otherwise. """ # Stop the controllers if not stop_controllers(controller_list, ns, strictness, start_asap, timeout, max_retries): return False # Unload the controllers if not unload_controller_list(controller_list, ns, max_retries): return False # All controllers were successfully unspawned return True