Python SDK for ROS 2 communication via Zenoh - Use ROS 2 without ROS 2 environment
Enable ROS 2 topic publishing and subscribing in pure Python applications. Publishers and subscribers automatically appear in ros2 topic list and work seamlessly with existing ROS 2 nodes using rmw_zenoh.
- Docs (GitHub Pages): robotis-git.github.io/zenoh_ros2_sdk
- API Reference (in this repo):
docs/api/
- ✅ No ROS 2 installation required - Works with just Python and Zenoh
- ✅ Appears in
ros2 topic list- Uses liveliness tokens for ROS 2 discovery - ✅ Automatic resource management - GIDs, node IDs, entity IDs handled automatically
- ✅ Session pooling - Multiple publishers/subscribers share the same Zenoh session
- ✅ Automatic message/service loading - Automatically downloads message and service definitions from Git repositories
- ✅ Type hash computation - Computes ROS2-compatible type hashes from message/service definitions
- ✅ Type registration - Automatic message and service type registration
- ✅ Service support - Create service clients and servers with automatic type loading
- ✅ Clean API - Simple, intuitive interface
pip install zenoh-ros2-sdkAfter install, the zenoh-ros2 command is available. Topic and service commands:
# Topic discovery (ros2 topic list / info equivalents)
zenoh-ros2 topic list -t # list topics with types
zenoh-ros2 topic info -v /chatter # verbose info for a topic
zenoh-ros2 --router 127.0.0.1:7447 topic list
# Service discovery (ros2 service list / type equivalents)
zenoh-ros2 service list -t # list services with types
zenoh-ros2 service type /add_two_ints # print type(s) for a serviceFor fast discovery (no ~10 s exit delay), start the background daemon: zenoh-ros2 daemon start. Then zenoh-ros2 topic ... and zenoh-ros2 service ... use it and return quickly. Use --no-daemon to run discovery in-process.
from zenoh_ros2_sdk import ROS2Publisher
# Message type loading is automatic (no need to call load_message_type)
# Create publisher - msg_definition is optional, auto-loads from registry
pub = ROS2Publisher(
topic="/chatter",
msg_type="std_msgs/msg/String"
)
# Publish messages
pub.publish(data="Hello World!")
pub.publish(data="Another message")
pub.close()from zenoh_ros2_sdk import ROS2Subscriber
# Message type loading is automatic (no need to call load_message_type)
def on_message(msg):
print(f"Received: {msg.data}")
# Create subscriber - msg_definition is optional, auto-loads from registry
sub = ROS2Subscriber(
topic="/chatter",
msg_type="std_msgs/msg/String",
callback=on_message
)
# Keep running
import time
time.sleep(10)
sub.close()from zenoh_ros2_sdk import ROS2ServiceServer, get_message_class
# Service type loading is automatic (no need to call load_service_type)
def service_handler(request):
# Get response message class
Response = get_message_class("example_interfaces/srv/AddTwoInts_Response")
# Process request and return response
return Response(sum=request.a + request.b)
# Create service server
server = ROS2ServiceServer(
service_name="/add_two_ints",
srv_type="example_interfaces/srv/AddTwoInts",
callback=service_handler
)
# Keep running
import time
time.sleep(10)
server.close()from zenoh_ros2_sdk import ROS2ServiceClient
# Service type loading is automatic (no need to call load_service_type)
# Create service client
client = ROS2ServiceClient(
service_name="/add_two_ints",
srv_type="example_interfaces/srv/AddTwoInts"
)
# Make synchronous service call
response = client.call(a=5, b=3)
if response:
print(f"Sum: {response.sum}")
# Make asynchronous service call
def callback(response):
if response:
print(f"Sum: {response.sum}")
client.call_async(callback, a=10, b=20)
client.close()-
ZenohSession (Singleton)
- Manages shared Zenoh session
- Handles type registration
- Generates unique GIDs
- Manages node/entity ID counters
-
ROS2Publisher
- Creates publisher with liveliness tokens
- Handles attachments (sequence, timestamp, GID)
- Appears in
ros2 topic list
-
ROS2Subscriber
- Subscribes to topics
- Deserializes CDR messages
- Calls user callback
-
ROS2ServiceClient
- Creates service client with liveliness tokens
- Sends requests using Zenoh queries
- Receives responses asynchronously
- Supports both synchronous and asynchronous calls
-
ROS2ServiceServer
- Creates service server with liveliness tokens
- Receives requests via Zenoh queryable
- Calls user callback with request
- Sends response back to client
- GID Generation: Uses UUID4 to generate unique 16-byte GIDs
- Node IDs: Auto-incremented per node
- Entity IDs: Auto-incremented per publisher/subscriber
- Session Reuse: All publishers/subscribers share the same Zenoh session
See examples/README.md for self-contained example scripts.
The SDK automatically downloads message definitions from Git repositories. Message types are loaded automatically when creating publishers/subscribers:
from zenoh_ros2_sdk import ROS2Publisher, get_message_class
# Message type loading is automatic - no need to call load_message_type
# Get message classes for easy object creation
Vector3 = get_message_class("geometry_msgs/msg/Vector3")
Twist = get_message_class("geometry_msgs/msg/Twist")
# Create publisher - message type is automatically loaded
pub = ROS2Publisher(
topic="/cmd_vel",
msg_type="geometry_msgs/msg/Twist"
)
# Create message objects
linear = Vector3(x=0.5, y=0.0, z=0.0)
angular = Vector3(x=0.0, y=0.0, z=0.2)
pub.publish(linear=linear, angular=angular)
pub.close()You can still provide message definitions manually if needed:
from zenoh_ros2_sdk import ROS2Publisher
pub = ROS2Publisher(
topic="/counter",
msg_type="std_msgs/msg/Int32",
msg_definition="int32 data\n"
)
pub.publish(data=42)
pub.close()domain_id: ROS domain ID (defaults toROS_DOMAIN_IDor 0)router_ip: Zenoh router IP addressrouter_port: Zenoh router portnode_name: Custom node name (auto-generated if not provided)namespace: Node namespace (default: "/")
You can override the Zenoh session configuration using the ZENOH_CONFIG_OVERRIDE
environment variable (a semicolon-separated list of path=value entries). This is
useful for enabling features like shared memory transport or forcing client mode:
export ZENOH_CONFIG_OVERRIDE='transport/shared_memory/enabled=true;mode="client";connect/endpoints=["tcp/192.168.6.2:7447"]'Notes:
- Values are parsed as JSON5. If the value is a string, it must be quoted (e.g.,
mode="client").
If you do not pass domain_id to the constructor, the SDK uses ROS_DOMAIN_ID from
the environment (falling back to 0 when it is not set). An explicit domain_id
argument always overrides the environment value.
export ROS_DOMAIN_ID=30- Python 3.8+
eclipse-zenohPython package (>=0.10.0)rosbagsPython package (>=0.11.0, for message serialization)GitPythonPython package (>=3.1.18, for automatic message downloading from git repositories)tqdmPython package (>=4.64.0, for download progress indicators)
For development and testing:
# From PyPI:
pip install "zenoh-ros2-sdk[dev]"
# From source:
pip install -e ".[dev]"pip install zenoh-ros2-sdkgit clone https://github.com/robotis-git/zenoh_ros2_sdk.git
cd zenoh_ros2_sdk
pip install -e .- Singleton Session: All publishers/subscribers share one Zenoh session for efficiency
- Auto GID Generation: Uses UUID4 for unique GIDs per publisher
- Liveliness Tokens: Automatically declared so publishers appear in
ros2 topic list - Type Hash Computation: Automatically computes ROS2-compatible type hashes from message definitions using the same algorithm as ROS2
- Message Registry: Automatically downloads message definitions from Git repositories and caches them locally
- Clean API: Abstracts away Zenoh/rmw_zenoh complexity
- Support for more message types out of the box
- Action support
- Better error handling and retry logic
- Connection pooling and reconnection
- QoS configuration options