diff --git a/src/rai_core/rai/communication/ros2/connectors/base.py b/src/rai_core/rai/communication/ros2/connectors/base.py index 4f0accec1..d23f65ee3 100644 --- a/src/rai_core/rai/communication/ros2/connectors/base.py +++ b/src/rai_core/rai/communication/ros2/connectors/base.py @@ -58,6 +58,21 @@ class ROS2BaseConnector(ROS2ActionMixin, ROS2ServiceMixin, BaseConnector[T]): Type of executor to use for processing ROS2 callbacks, by default "multi_threaded". use_sim_time : bool, optional Whether to use simulation time or system time, by default False. + context : int or rclpy.Context or None, optional + Controls which ROS2 context (and therefore which DDS domain) this + connector participates in. Three forms are accepted: + + - ``None`` (default): use the process-global default context. + ``rclpy.init()`` must have been called before constructing the + connector, or the connector will auto-initialize and warn. + - ``int``: a domain ID shorthand. The connector creates and owns a + fresh ``rclpy.Context``, initialises it with the given domain ID, + and shuts it down when ``connector.shutdown()`` is called. Use + this to run multiple isolated connectors in the same process + (e.g. parallel simulation instances on different domains). + - ``rclpy.Context``: an already-initialised context supplied by the + caller. The connector uses it as-is and does **not** shut it down + — the caller retains ownership of the context lifecycle. Methods ------- @@ -96,6 +111,17 @@ class ROS2BaseConnector(ROS2ActionMixin, ROS2ServiceMixin, BaseConnector[T]): - False (default): Subscribers remain active after message reception - Pros: More stable operation, avoids potential crashes - Cons: May lead to memory/performance overhead from inactive subscribers + + Context Lifecycle: + Ownership of the underlying ``rclpy.Context`` follows a simple rule: + *the party that created it is responsible for shutting it down.* + + - ``context=None``: the global context is managed externally + (e.g. via ``ROS2Context``) — ``shutdown()`` does not touch it. + - ``context=``: the connector created the context, so + ``shutdown()`` calls ``rclpy.shutdown(context=...)``. + - ``context=``: the caller created the context, so + ``shutdown()`` leaves it alone. """ def __init__( @@ -104,6 +130,7 @@ def __init__( destroy_subscribers: bool = False, executor_type: Literal["single_threaded", "multi_threaded"] = "multi_threaded", use_sim_time: bool = False, + context: Optional[int | rclpy.Context] = None, ): """Initialize the ROS2BaseConnector. @@ -115,6 +142,22 @@ def __init__( Whether to destroy subscribers after receiving a message, by default False. executor_type : Literal["single_threaded", "multi_threaded"], optional Type of executor to use for processing ROS2 callbacks, by default "multi_threaded". + use_sim_time : bool, optional + Whether to use simulation time or system time, by default False. + context : int or rclpy.Context or None, optional + Controls which ROS2 context this connector uses. + + - ``None`` (default): uses the process-global default context. + Auto-initializes via ``rclpy.init()`` with a warning if not + already initialized. + - ``int``: domain ID shorthand — the connector creates a new + ``rclpy.Context``, calls ``rclpy.init(context=..., + domain_id=)``, and **owns** the context (shuts it down + on ``shutdown()``). Safe to use alongside other connectors + on different domain IDs in the same process. + - ``rclpy.Context``: an already-initialized context. The + connector uses it directly and does **not** call + ``rclpy.shutdown`` on it — the caller owns the lifecycle. Raises ------ @@ -124,14 +167,26 @@ def __init__( super().__init__() if node_name is None: node_name = f"rai_ros2_connector_{str(uuid.uuid4())[-12:]}" - if not rclpy.ok(): - rclpy.init() - self.logger.warning( - "Auto-initializing ROS2, but manual initialization is recommended. " - "For better control and predictability, call rclpy.init() or ROS2Context before creating this connector." - ) + + if isinstance(context, int): + self._rclpy_context: Optional[rclpy.Context] = rclpy.Context() + rclpy.init(context=self._rclpy_context, domain_id=context) + self._owns_context = True + elif isinstance(context, rclpy.Context): + self._rclpy_context = context + self._owns_context = False + else: + self._rclpy_context = None + self._owns_context = False + if not rclpy.ok(): + rclpy.init() + self.logger.warning( + "Auto-initializing ROS2, but manual initialization is recommended. " + "For better control and predictability, call rclpy.init() or ROS2Context before creating this connector." + ) + self._executor_type = executor_type - self._node = Node(node_name) + self._node = Node(node_name, context=self._rclpy_context) if use_sim_time: self._node.set_parameters( [Parameter("use_sim_time", Parameter.Type.BOOL, True)] @@ -152,9 +207,9 @@ def __init__( "SingleThreadedExecutor", } if self._executor_type == "multi_threaded": - self._executor = MultiThreadedExecutor() + self._executor = MultiThreadedExecutor(context=self._rclpy_context) elif self._executor_type == "single_threaded": - self._executor = SingleThreadedExecutor() + self._executor = SingleThreadedExecutor(context=self._rclpy_context) else: raise ValueError(f"Invalid executor type: {self._executor_type}") @@ -570,3 +625,5 @@ def shutdown(self): self._topic_api.shutdown() self._executor.shutdown() self._thread.join() + if self._owns_context and self._rclpy_context is not None: + rclpy.shutdown(context=self._rclpy_context) diff --git a/src/rai_sim/rai_sim/launch_manager.py b/src/rai_sim/rai_sim/launch_manager.py index abee90bd2..123ba0168 100644 --- a/src/rai_sim/rai_sim/launch_manager.py +++ b/src/rai_sim/rai_sim/launch_manager.py @@ -14,6 +14,7 @@ import asyncio import multiprocessing +import os from multiprocessing.synchronize import Event from typing import Optional @@ -25,11 +26,15 @@ def __init__(self) -> None: self._stop_event: Optional[Event] = None self._process: Optional[multiprocessing.Process] = None - def start(self, launch_description: LaunchDescription) -> None: + def start( + self, + launch_description: LaunchDescription, + domain_id: Optional[int] = None, + ) -> None: self._stop_event = multiprocessing.Event() self._process = multiprocessing.Process( target=self._run_process, - args=(self._stop_event, launch_description), + args=(self._stop_event, launch_description, domain_id), daemon=True, ) self._process.start() @@ -41,8 +46,13 @@ def shutdown(self) -> None: self._process.join() def _run_process( - self, stop_event: Event, launch_description: LaunchDescription + self, + stop_event: Event, + launch_description: LaunchDescription, + domain_id: Optional[int] = None, ) -> None: + if domain_id is not None: + os.environ["ROS_DOMAIN_ID"] = str(domain_id) loop = asyncio.get_event_loop() asyncio.set_event_loop(loop) launch_service = LaunchService() diff --git a/src/rai_sim/rai_sim/o3de/o3de_bridge.py b/src/rai_sim/rai_sim/o3de/o3de_bridge.py index 0de315941..de53509cd 100644 --- a/src/rai_sim/rai_sim/o3de/o3de_bridge.py +++ b/src/rai_sim/rai_sim/o3de/o3de_bridge.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import os import signal import subprocess import time @@ -51,6 +52,7 @@ class O3DExROS2SimulationConfig(SimulationConfig): binary_path: Path level: Optional[str] = None + domain_id: Optional[int] = None required_simulation_ros2_interfaces: dict[str, List[str]] required_robotic_ros2_interfaces: dict[str, List[str]] @@ -393,8 +395,12 @@ def _launch_binary( if simulation_config.level: command.append(f"+LoadLevel {simulation_config.level}") self.logger.info(f"Running command: {command}") + env = {**os.environ} + if simulation_config.domain_id is not None: + env["ROS_DOMAIN_ID"] = str(simulation_config.domain_id) self.current_sim_process = subprocess.Popen( command, + env=env, ) if not self._has_process_started(process=self.current_sim_process): raise RuntimeError("Process did not start in time.") @@ -407,8 +413,9 @@ def launch_robotic_stack( self, required_robotic_ros2_interfaces: dict[str, List[str]], launch_description: LaunchDescription, + domain_id: Optional[int] = None, ): - self.manager.start(launch_description=launch_description) + self.manager.start(launch_description=launch_description, domain_id=domain_id) if not self._is_ros2_stack_ready( required_ros2_stack=required_robotic_ros2_interfaces diff --git a/tests/communication/ros2/test_connector_context.py b/tests/communication/ros2/test_connector_context.py new file mode 100644 index 000000000..eda39f9e0 --- /dev/null +++ b/tests/communication/ros2/test_connector_context.py @@ -0,0 +1,308 @@ +# Copyright (C) 2025 Robotec.AI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the ``context`` parameter of ROS2BaseConnector / ROS2Connector. + +The parameter accepts three forms: + +- ``None`` – use the process-global default context (existing behaviour). +- ``int`` – domain-ID shorthand; connector creates and owns a fresh + ``rclpy.Context`` for that domain. +- ``rclpy.Context`` – caller-supplied, already-initialised context; connector + does not manage its lifecycle. + +Each form is tested for: + * correct internal state (``_rclpy_context``, ``_owns_context``) + * correct wiring of the Node and executor to the resolved context + * correct lifecycle behaviour on ``connector.shutdown()`` + * isolation / coexistence between multiple connectors +""" + +from typing import Generator +from unittest.mock import MagicMock, patch + +import pytest +import rclpy +import rclpy.context +from rai.communication.ros2 import ROS2Connector + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def ros_setup() -> Generator[None, None, None]: + """Module-scoped global rclpy initialisation for context=None tests.""" + rclpy.init() + yield + rclpy.shutdown() + + +_ = ros_setup # keep linters quiet about "unused import" + + +@pytest.fixture +def external_context() -> Generator[rclpy.Context, None, None]: + """Provides a function-scoped, already-initialised rclpy.Context on domain 60. + + The connector under test must NOT shut this down — the fixture owns it. + """ + ctx = rclpy.Context() + rclpy.init(context=ctx, domain_id=60) + yield ctx + if ctx.ok(): + rclpy.shutdown(context=ctx) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +# Patches applied when testing the auto-init code path. We replace all ROS2 +# infrastructure with mocks so the test does not need a live ROS2 environment. +_INFRA_PATCHES = [ + "rai.communication.ros2.connectors.base.Node", + "rai.communication.ros2.connectors.base.ROS2TopicAPI", + "rai.communication.ros2.connectors.base.ROS2ServiceAPI", + "rai.communication.ros2.connectors.base.ROS2ActionAPI", + "rai.communication.ros2.connectors.base.Buffer", + "rai.communication.ros2.connectors.base.TransformListener", + "rai.communication.ros2.connectors.base.MultiThreadedExecutor", + "threading.Thread", +] + + +# --------------------------------------------------------------------------- +# context=int — connector creates and OWNS the context +# --------------------------------------------------------------------------- + + +def test_context_int_creates_rclpy_context(): + """Passing an int creates a new rclpy.Context instance.""" + conn = ROS2Connector(context=40) + try: + assert isinstance(conn._rclpy_context, rclpy.Context) + finally: + conn.shutdown() + + +def test_context_int_owns_context(): + """Passing an int sets _owns_context=True.""" + conn = ROS2Connector(context=41) + try: + assert conn._owns_context is True + finally: + conn.shutdown() + + +def test_context_int_context_is_initialized(): + """The created context is live (ok() returns True) during connector lifetime.""" + conn = ROS2Connector(context=42) + try: + assert conn._rclpy_context.ok() is True + finally: + conn.shutdown() + + +def test_context_int_node_participates_in_created_context(): + """The connector's Node is created with the owned context.""" + conn = ROS2Connector(context=43) + try: + assert conn.node.context is conn._rclpy_context + finally: + conn.shutdown() + + +def test_context_int_executor_participates_in_created_context(): + """The executor is created with the owned context.""" + conn = ROS2Connector(context=44) + try: + assert conn._executor._context is conn._rclpy_context + finally: + conn.shutdown() + + +def test_context_int_shutdown_destroys_context(): + """shutdown() shuts down the owned context when _owns_context=True.""" + conn = ROS2Connector(context=45) + ctx = conn._rclpy_context + assert ctx.ok() is True + conn.shutdown() + assert ctx.ok() is False + + +def test_context_int_multiple_domains_coexist(): + """Two connectors on different domain IDs hold independent contexts, both live.""" + conn_a = ROS2Connector(context=46) + conn_b = ROS2Connector(context=47) + try: + assert conn_a._rclpy_context is not conn_b._rclpy_context + assert conn_a._rclpy_context.ok() is True + assert conn_b._rclpy_context.ok() is True + finally: + conn_a.shutdown() + conn_b.shutdown() + + +def test_context_int_independent_shutdown(): + """Shutting down one connector does not affect the other's context.""" + conn_a = ROS2Connector(context=48) + conn_b = ROS2Connector(context=49) + ctx_a = conn_a._rclpy_context + ctx_b = conn_b._rclpy_context + try: + conn_a.shutdown() + assert ctx_a.ok() is False # A's context gone + assert ctx_b.ok() is True # B's context untouched + finally: + conn_b.shutdown() + + +# --------------------------------------------------------------------------- +# context=rclpy.Context — connector BORROWS the context +# --------------------------------------------------------------------------- + + +def test_context_object_stores_provided_context(external_context): + """_rclpy_context is the exact object passed in.""" + conn = ROS2Connector(context=external_context) + try: + assert conn._rclpy_context is external_context + finally: + conn.shutdown() + + +def test_context_object_does_not_own_context(external_context): + """Passing an rclpy.Context sets _owns_context=False.""" + conn = ROS2Connector(context=external_context) + try: + assert conn._owns_context is False + finally: + conn.shutdown() + + +def test_context_object_node_uses_provided_context(external_context): + """The connector's Node is wired to the supplied context.""" + conn = ROS2Connector(context=external_context) + try: + assert conn.node.context is external_context + finally: + conn.shutdown() + + +def test_context_object_executor_uses_provided_context(external_context): + """The executor is wired to the supplied context.""" + conn = ROS2Connector(context=external_context) + try: + assert conn._executor._context is external_context + finally: + conn.shutdown() + + +def test_context_object_shutdown_preserves_context(external_context): + """shutdown() does NOT shut down a caller-owned context.""" + conn = ROS2Connector(context=external_context) + conn.shutdown() + assert external_context.ok() is True # fixture-owned context is still live + + +def test_context_object_multiple_connectors_share_one_context(): + """Two connectors can share the same rclpy.Context; it survives both shutdowns.""" + ctx = rclpy.Context() + rclpy.init(context=ctx, domain_id=61) + conn_a = ROS2Connector(context=ctx) + conn_b = ROS2Connector(context=ctx) + try: + assert conn_a._rclpy_context is ctx + assert conn_b._rclpy_context is ctx + assert conn_a.node.context is conn_b.node.context + conn_a.shutdown() + assert ctx.ok() is True # context survives first shutdown + conn_b.shutdown() + assert ctx.ok() is True # context survives second shutdown + finally: + if ctx.ok(): + rclpy.shutdown(context=ctx) + + +# --------------------------------------------------------------------------- +# context=None — connector uses the global default context +# --------------------------------------------------------------------------- + + +def test_context_none_rclpy_context_is_none(ros_setup): + """context=None leaves _rclpy_context as None (uses global default).""" + conn = ROS2Connector() + try: + assert conn._rclpy_context is None + finally: + conn.shutdown() + + +def test_context_none_does_not_own_context(ros_setup): + """context=None sets _owns_context=False.""" + conn = ROS2Connector() + try: + assert conn._owns_context is False + finally: + conn.shutdown() + + +def test_context_none_shutdown_does_not_affect_global_rclpy(ros_setup): + """shutdown() must not call rclpy.shutdown() when the context is not owned.""" + conn = ROS2Connector() + conn.shutdown() + assert rclpy.ok() is True # module-scoped rclpy still alive + + +# --------------------------------------------------------------------------- +# context=None — auto-init path (rclpy not yet initialised) +# +# These tests mock all ROS2 infrastructure so they run without a live ROS2 +# environment and can force the "rclpy not initialised" code path. +# --------------------------------------------------------------------------- + + +def test_context_none_auto_init_calls_rclpy_init(): + """When rclpy is not initialised, context=None triggers rclpy.init().""" + with ( + patch.object(rclpy, "ok", return_value=False), + patch.object(rclpy, "init") as mock_init, + ): + with patch.multiple( + "rai.communication.ros2.connectors.base", + **{p.split(".")[-1]: MagicMock() for p in _INFRA_PATCHES if "base" in p}, + ): + with patch("threading.Thread"): + ROS2Connector() + mock_init.assert_called_once_with() + + +def test_context_none_auto_init_logs_warning(capsys): + """When rclpy auto-initialises, a warning containing 'Auto-initializing' is written.""" + # The connector logger uses logging.lastResort (stderr) when no handlers are + # configured, so we capture stderr rather than using caplog. + with patch.object(rclpy, "ok", return_value=False), patch.object(rclpy, "init"): + with patch.multiple( + "rai.communication.ros2.connectors.base", + **{p.split(".")[-1]: MagicMock() for p in _INFRA_PATCHES if "base" in p}, + ): + with patch("threading.Thread"): + ROS2Connector() + + captured = capsys.readouterr() + assert "Auto-initializing" in captured.err, ( + f"Expected auto-init warning in stderr, got: {captured.err!r}" + ) diff --git a/tests/rai_sim/test_o3de_bridge.py b/tests/rai_sim/test_o3de_bridge.py index 9031c6efe..0526bdc08 100644 --- a/tests/rai_sim/test_o3de_bridge.py +++ b/tests/rai_sim/test_o3de_bridge.py @@ -141,7 +141,7 @@ def test_launch_robotic_stack(self): self.bridge.launch_robotic_stack(required_interfaces, mock_launch_description) self.mock_launch_manager.start.assert_called_once_with( - launch_description=mock_launch_description + launch_description=mock_launch_description, domain_id=None ) self.bridge._is_ros2_stack_ready.assert_called_once_with( required_ros2_stack=required_interfaces @@ -166,7 +166,7 @@ def test_launch_binary(self, mock_popen): self.bridge._launch_binary(self.test_config) - mock_popen.assert_called_once_with(["/path/to/binary"]) + mock_popen.assert_called_once_with(["/path/to/binary"], env=unittest.mock.ANY) self.assertEqual(self.bridge.current_sim_process, mock_process)