#!/bin/python3
"""
Canonical gym-proxy implementation for the UniROS ecosystem.
This is the single source of truth for the multiprocessing
gym-proxy class used by multiros, realros, and uniros. Each of
those packages re-exports it under a historical name
(``MultirosGym``, ``RealrosGym``, ``uniros_gym``) so existing user
code continues to work — every alias points at the class defined
here, so a fix landed here automatically reaches all three
packages.
The proxy spawns a worker process that holds the actual ``gym.Env``.
Parent-side method calls (``step`` / ``reset`` / ``close`` /
attribute access) flow over a ``multiprocessing.Pipe`` to the worker
and back. Worker-side exceptions are caught and shipped back as
:class:`_RemoteException`, so the parent re-raises with the
worker's traceback instead of hanging on the next ``recv()``.
"""
import traceback
from multiprocessing.connection import Connection
from typing import Any, Mapping, NoReturn, Optional, Tuple
import gymnasium as gym
from multiprocessing import Process, Pipe
class _RemoteException:
"""
Pickle-safe carrier for a worker-side exception + traceback.
The worker process catches every uncaught exception and ships an
instance of this class back through the pipe; the parent's _recv()
detects it and re-raises a RuntimeError with the worker-side
traceback embedded. Without this carrier a worker-side raise would
kill the worker silently and the parent's next recv() would block
forever.
We deliberately keep only string state (not the original exception
object) because some user-defined exception classes don't pickle
cleanly across process boundaries.
"""
__slots__ = ('exc_type_name', 'exc_repr', 'tb_string')
def __init__(self, exc: BaseException, tb_string: str) -> None:
self.exc_type_name = type(exc).__name__
self.exc_repr = repr(exc)
self.tb_string = tb_string
def reraise(self) -> NoReturn:
raise RuntimeError(
f"Exception in GymProxy worker process "
f"({self.exc_type_name}: {self.exc_repr}):\n{self.tb_string}"
)
[docs]
class GymProxy:
"""
Parent-side proxy for a ``gym.Env`` running inside a worker process.
Equivalent aliases re-exported by other packages in the ecosystem:
- ``multiros.core.MultirosGym``
- ``realros.core.RealrosGym``
- ``uniros.core.uniros_gym``
All four names refer to the same class object.
Usage::
from uniros.core import uniros_gym as gym # or any alias
env = gym.make("env_name", **kwargs)
env.reset()
"""
def __init__(self, env_name: str, *args: Any, **kwargs: Any) -> None:
# Set _closed first so close() / __del__ can be idempotent even if
# the rest of __init__ raises before completing.
self._closed: bool = False
# Create a pipe for communication between the main process and the worker process
self.parent_conn, self.child_conn = Pipe()
# Start the worker process and pass it the environment name, the child connection, and any additional arguments
self.process = Process(target=self.worker, args=(env_name, self.child_conn, *args), kwargs=kwargs)
self.process.start()
# Initialize the observation and action spaces to None
self.observation_space: Optional[gym.Space] = None
self.action_space: Optional[gym.Space] = None
[docs]
@staticmethod
def worker(env_name: str, conn: Connection, *args: Any, **kwargs: Any) -> None:
# --- Startup phase --------------------------------------------------
# If gym.make() raises, the parent is blocked on recv() in make().
# Send a _RemoteException so the parent can re-raise it and close,
# rather than hanging forever.
try:
env = gym.make(env_name, *args, **kwargs)
conn.send((env.observation_space, env.action_space))
except Exception as e:
try:
conn.send(_RemoteException(e, traceback.format_exc()))
except (BrokenPipeError, OSError):
pass
try:
conn.close()
except Exception:
pass
return
# --- Command loop ---------------------------------------------------
# Every command is wrapped: any user-side raise (bad action, NaN
# observation, controller_manager timeout, MoveIt crash, etc.)
# surfaces to the parent as a _RemoteException instead of killing
# the worker silently.
while True:
try:
cmd, data = conn.recv()
except (EOFError, BrokenPipeError, ConnectionResetError):
# Parent has gone away; exit cleanly.
return
try:
# `result` carries env-dependent types between branches:
# tuples from step/reset, arbitrary values from getattr,
# AttributeError-as-value from "attr not found", etc.
# Explicit Any so mypy doesn't lock it to the first branch.
result: Any
if cmd == 'step':
result = env.step(data)
elif cmd == 'reset':
seed, options = data
result = env.reset(seed=seed, options=options)
elif cmd == 'close':
env.close()
try:
conn.close()
except Exception:
pass
return
elif cmd == 'get_attribute':
# AttributeError is a normal API response here ("attr
# not found"); pass it through as a value so the
# parent's __getattr__ can re-raise it client-side.
# Any OTHER exception still flows through the outer
# except and becomes a _RemoteException.
try:
attr = getattr(env, data)
except AttributeError:
result = AttributeError(f"{data} not found")
else:
result = 'callable' if callable(attr) else attr
elif cmd == 'call_method':
method_name, m_args, m_kwargs = data
result = getattr(env, method_name)(*m_args, **m_kwargs)
else:
raise ValueError(f"Unknown command from parent: {cmd!r}")
conn.send(result)
except Exception as e:
try:
conn.send(_RemoteException(e, traceback.format_exc()))
except (BrokenPipeError, OSError):
return
@property
def unwrapped(self) -> "GymProxy":
"""
Return self as the unwrapped env.
The proxy IS the user-visible env from the parent's perspective;
whatever lives in the worker process is opaque and not accessible
across the pipe (gym.Env instances typically hold non-picklable
state such as _thread.RLock from rospy's spinner threads, so
sending them through the pipe raises TypeError).
Without this handler, env.unwrapped would route through
__getattr__, IPC to the worker, and fail to pickle. SB3's
DummyVecEnv calls `id(env.unwrapped)` for uniqueness checking
during construction, so this property is required for SB3
compatibility.
"""
return self
def _recv(self) -> Any:
"""
Receive a message from the worker, re-raising any remote exception.
Every parent-side recv() goes through this so a worker-side raise
becomes a parent-side RuntimeError (with the worker traceback)
rather than a silent hang on the next recv().
"""
msg = self.parent_conn.recv()
if isinstance(msg, _RemoteException):
# If the worker died, also tear down our side cleanly so the
# caller doesn't have to remember to call close() after the
# exception.
self._closed = True
self.process.join(timeout=2.0)
if self.process.is_alive():
self.process.terminate()
msg.reraise()
return msg
[docs]
@classmethod
def make(cls, env_name: str, *args: Any, **kwargs: Any) -> "GymProxy":
# Create an instance of the proxy class and pass it the environment name and any additional arguments
env = cls(env_name, *args, **kwargs)
# Receive the observation and action spaces from the worker process and set them on the instance.
# If the worker raised during gym.make(), _recv() re-raises here.
env.observation_space, env.action_space = env._recv()
# Return the instance of the proxy class
return env
[docs]
def step(self, action: Any) -> Tuple[Any, float, bool, bool, Mapping[str, Any]]:
# Send a 'step' command to the worker process along with the action to take
self.parent_conn.send(('step', action))
# Receive and return the result of taking a step in the environment
return self._recv()
[docs]
def reset(self, seed: Optional[int] = None,
options: Optional[Mapping[str, Any]] = None,
) -> Tuple[Any, Mapping[str, Any]]:
# Send a 'reset' command to the worker process
self.parent_conn.send(('reset', (seed, options)))
# Receive and return the initial observation of the environment after resetting it
return self._recv()
[docs]
def close(self) -> None:
# Idempotent close: safe to call multiple times and from __del__.
if self._closed:
return
self._closed = True
try:
self.parent_conn.send(('close', None))
except (BrokenPipeError, OSError, EOFError):
# Worker already exited or pipe already torn down.
pass
# Bounded join so a hung worker can't lock the parent indefinitely.
self.process.join(timeout=5.0)
if self.process.is_alive():
self.process.terminate()
self.process.join(timeout=1.0)
if self.process.is_alive():
# SIGTERM didn't take. Typical when the worker is blocked in a
# non-responsive C call (PyKDL, Gazebo XMLRPC retry, etc.) or
# its rospy.Timer thread is spinning post-failure — e.g. when
# gym.make raised mid-init but the env_loop timer was already
# registered, so sample_observation keeps firing on a half-
# constructed env and burns 100% CPU. Without SIGKILL the
# subprocess survives the parent's exit and lingers as a
# CPU-burning zombie. SIGKILL is the last-resort cleanup.
self.process.kill()
self.process.join(timeout=1.0)
def __enter__(self) -> "GymProxy":
return self
def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None:
# Always close, even on exception. close() is idempotent so a later
# __del__ won't double-tear-down the worker. Propagate the exception
# by returning None (truthy return would swallow it).
self.close()
def __getattr__(self, name: str) -> Any:
# Send a 'get_attribute' command to the worker process along with the name of the attribute
self.parent_conn.send(('get_attribute', name))
# Receive the value of the attribute from the worker process.
# _recv() unwraps any _RemoteException; a normal AttributeError
# (attr not found) still flows through as a value and is re-raised
# below for backwards compatibility.
attr = self._recv()
# If the received value is a string, and it is equal to 'callable'
if isinstance(attr, str) and attr == 'callable':
# Define a new method that sends a 'call_method' command to the worker process
# along with the name of the method and its arguments
def method(*args: Any, **kwargs: Any) -> Any:
self.parent_conn.send(('call_method', (name, args, kwargs)))
# Receive and return the result of calling the method in the worker process
return self._recv()
# Return the newly defined method
return method
# If the received value is an Exception (e.g. AttributeError for
# "attr not found"), raise it client-side.
elif isinstance(attr, Exception):
raise attr
# Otherwise, return the received value as is
else:
return attr
def __del__(self) -> None:
# Destructors must never raise. Swallow everything; close() already
# absorbs the common cases (pipe closed, process gone) on its own.
try:
self.close()
except Exception:
pass