Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 66 additions & 9 deletions src/rai_core/rai/communication/ros2/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ class ROS2BaseConnector(ROS2ActionMixin, ROS2ServiceMixin, BaseConnector[T]):
Type of executor to use for processing ROS2 callbacks, by default "multi_threaded".
use_sim_time : bool, optional
Whether to use simulation time or system time, by default False.
context : int or rclpy.Context or None, optional
Controls which ROS2 context (and therefore which DDS domain) this
connector participates in. Three forms are accepted:

- ``None`` (default): use the process-global default context.
``rclpy.init()`` must have been called before constructing the
connector, or the connector will auto-initialize and warn.
- ``int``: a domain ID shorthand. The connector creates and owns a
fresh ``rclpy.Context``, initialises it with the given domain ID,
and shuts it down when ``connector.shutdown()`` is called. Use
this to run multiple isolated connectors in the same process
(e.g. parallel simulation instances on different domains).
- ``rclpy.Context``: an already-initialised context supplied by the
caller. The connector uses it as-is and does **not** shut it down
— the caller retains ownership of the context lifecycle.

Methods
-------
Expand Down Expand Up @@ -96,6 +111,17 @@ class ROS2BaseConnector(ROS2ActionMixin, ROS2ServiceMixin, BaseConnector[T]):
- False (default): Subscribers remain active after message reception
- Pros: More stable operation, avoids potential crashes
- Cons: May lead to memory/performance overhead from inactive subscribers

Context Lifecycle:
Ownership of the underlying ``rclpy.Context`` follows a simple rule:
*the party that created it is responsible for shutting it down.*

- ``context=None``: the global context is managed externally
(e.g. via ``ROS2Context``) — ``shutdown()`` does not touch it.
- ``context=<int>``: the connector created the context, so
``shutdown()`` calls ``rclpy.shutdown(context=...)``.
- ``context=<rclpy.Context>``: the caller created the context, so
``shutdown()`` leaves it alone.
"""

def __init__(
Expand All @@ -104,6 +130,7 @@ def __init__(
destroy_subscribers: bool = False,
executor_type: Literal["single_threaded", "multi_threaded"] = "multi_threaded",
use_sim_time: bool = False,
context: Optional[int | rclpy.Context] = None,
):
"""Initialize the ROS2BaseConnector.

Expand All @@ -115,6 +142,22 @@ def __init__(
Whether to destroy subscribers after receiving a message, by default False.
executor_type : Literal["single_threaded", "multi_threaded"], optional
Type of executor to use for processing ROS2 callbacks, by default "multi_threaded".
use_sim_time : bool, optional
Whether to use simulation time or system time, by default False.
context : int or rclpy.Context or None, optional
Controls which ROS2 context this connector uses.

- ``None`` (default): uses the process-global default context.
Auto-initializes via ``rclpy.init()`` with a warning if not
already initialized.
- ``int``: domain ID shorthand — the connector creates a new
``rclpy.Context``, calls ``rclpy.init(context=...,
domain_id=<value>)``, and **owns** the context (shuts it down
on ``shutdown()``). Safe to use alongside other connectors
on different domain IDs in the same process.
- ``rclpy.Context``: an already-initialized context. The
connector uses it directly and does **not** call
``rclpy.shutdown`` on it — the caller owns the lifecycle.

Raises
------
Expand All @@ -124,14 +167,26 @@ def __init__(
super().__init__()
if node_name is None:
node_name = f"rai_ros2_connector_{str(uuid.uuid4())[-12:]}"
if not rclpy.ok():
rclpy.init()
self.logger.warning(
"Auto-initializing ROS2, but manual initialization is recommended. "
"For better control and predictability, call rclpy.init() or ROS2Context before creating this connector."
)

if isinstance(context, int):
self._rclpy_context: Optional[rclpy.Context] = rclpy.Context()
rclpy.init(context=self._rclpy_context, domain_id=context)
self._owns_context = True
elif isinstance(context, rclpy.Context):
self._rclpy_context = context
self._owns_context = False
else:
self._rclpy_context = None
self._owns_context = False
if not rclpy.ok():
rclpy.init()
self.logger.warning(
"Auto-initializing ROS2, but manual initialization is recommended. "
"For better control and predictability, call rclpy.init() or ROS2Context before creating this connector."
)

self._executor_type = executor_type
self._node = Node(node_name)
self._node = Node(node_name, context=self._rclpy_context)
if use_sim_time:
self._node.set_parameters(
[Parameter("use_sim_time", Parameter.Type.BOOL, True)]
Expand All @@ -152,9 +207,9 @@ def __init__(
"SingleThreadedExecutor",
}
if self._executor_type == "multi_threaded":
self._executor = MultiThreadedExecutor()
self._executor = MultiThreadedExecutor(context=self._rclpy_context)
elif self._executor_type == "single_threaded":
self._executor = SingleThreadedExecutor()
self._executor = SingleThreadedExecutor(context=self._rclpy_context)
else:
raise ValueError(f"Invalid executor type: {self._executor_type}")

Expand Down Expand Up @@ -570,3 +625,5 @@ def shutdown(self):
self._topic_api.shutdown()
self._executor.shutdown()
self._thread.join()
if self._owns_context and self._rclpy_context is not None:
rclpy.shutdown(context=self._rclpy_context)
16 changes: 13 additions & 3 deletions src/rai_sim/rai_sim/launch_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import multiprocessing
import os
from multiprocessing.synchronize import Event
from typing import Optional

Expand All @@ -25,11 +26,15 @@ def __init__(self) -> None:
self._stop_event: Optional[Event] = None
self._process: Optional[multiprocessing.Process] = None

def start(self, launch_description: LaunchDescription) -> None:
def start(
self,
launch_description: LaunchDescription,
domain_id: Optional[int] = None,
) -> None:
self._stop_event = multiprocessing.Event()
self._process = multiprocessing.Process(
target=self._run_process,
args=(self._stop_event, launch_description),
args=(self._stop_event, launch_description, domain_id),
daemon=True,
)
self._process.start()
Expand All @@ -41,8 +46,13 @@ def shutdown(self) -> None:
self._process.join()

def _run_process(
self, stop_event: Event, launch_description: LaunchDescription
self,
stop_event: Event,
launch_description: LaunchDescription,
domain_id: Optional[int] = None,
) -> None:
if domain_id is not None:
os.environ["ROS_DOMAIN_ID"] = str(domain_id)
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
launch_service = LaunchService()
Expand Down
9 changes: 8 additions & 1 deletion src/rai_sim/rai_sim/o3de/o3de_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import os
import signal
import subprocess
import time
Expand Down Expand Up @@ -51,6 +52,7 @@
class O3DExROS2SimulationConfig(SimulationConfig):
binary_path: Path
level: Optional[str] = None
domain_id: Optional[int] = None
required_simulation_ros2_interfaces: dict[str, List[str]]
required_robotic_ros2_interfaces: dict[str, List[str]]

Expand Down Expand Up @@ -393,8 +395,12 @@ def _launch_binary(
if simulation_config.level:
command.append(f"+LoadLevel {simulation_config.level}")
self.logger.info(f"Running command: {command}")
env = {**os.environ}
if simulation_config.domain_id is not None:
env["ROS_DOMAIN_ID"] = str(simulation_config.domain_id)
self.current_sim_process = subprocess.Popen(
command,
env=env,
)
if not self._has_process_started(process=self.current_sim_process):
raise RuntimeError("Process did not start in time.")
Expand All @@ -407,8 +413,9 @@ def launch_robotic_stack(
self,
required_robotic_ros2_interfaces: dict[str, List[str]],
launch_description: LaunchDescription,
domain_id: Optional[int] = None,
):
self.manager.start(launch_description=launch_description)
self.manager.start(launch_description=launch_description, domain_id=domain_id)

if not self._is_ros2_stack_ready(
required_ros2_stack=required_robotic_ros2_interfaces
Expand Down
Loading
Loading