Skip to content

Commit 9166310

Browse files
authored
Fixed failing ros2 param ... on r2r nodes for Jazzy (#120)
Add `/set_parameters_atomically` service to `make_parameter_handler_internal` in `nodes.rs` to fix failing `ros2 param ...` on r2r nodes for Jazzy.
1 parent 56acba5 commit 9166310

File tree

1 file changed

+83
-4
lines changed

1 file changed

+83
-4
lines changed

r2r/src/nodes.rs

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,13 @@ impl Node {
283283
.register_parameters("", None, &mut self.params.lock().unwrap())?;
284284
}
285285
let mut handlers: Vec<std::pin::Pin<Box<dyn Future<Output = ()> + Send>>> = Vec::new();
286-
let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
286+
287+
let (mut set_event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
288+
let mut set_atomically_event_tx = set_event_tx.clone();
287289

288290
let node_name = self.name()?;
291+
292+
// rcl_interfaces/srv/SetParameters
289293
let set_params_request_stream = self
290294
.create_service::<rcl_interfaces::srv::SetParameters::Service>(
291295
&format!("{}/set_parameters", node_name),
@@ -335,7 +339,7 @@ impl Node {
335339
};
336340
// if the value changed, send out new value on parameter event stream
337341
if changed && r.successful {
338-
if let Err(e) = event_tx.try_send((p.name.clone(), val)) {
342+
if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) {
339343
log::debug!("Warning: could not send parameter event ({}).", e);
340344
}
341345
}
@@ -449,6 +453,77 @@ impl Node {
449453

450454
handlers.push(Box::pin(get_param_types_future));
451455

456+
// rcl_interfaces/srv/SetParametersAtomically
457+
458+
// NOTE: This is not a proper implementation of the specs, but rather a copy of set_parameters.
459+
// On error, some of the parameters might already be set.
460+
let set_params_atomically_request_stream =
461+
self.create_service::<rcl_interfaces::srv::SetParametersAtomically::Service>(
462+
&format!("{}/set_parameters_atomically", node_name),
463+
QosProfile::default(),
464+
)?;
465+
466+
let params = self.params.clone();
467+
let params_struct_clone = params_struct.clone();
468+
let set_params_atomically_future = set_params_atomically_request_stream.for_each(
469+
move |req: ServiceRequest<rcl_interfaces::srv::SetParametersAtomically::Service>| {
470+
let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default();
471+
result.result.successful = true;
472+
for p in &req.message.parameters {
473+
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
474+
let changed = params
475+
.lock()
476+
.unwrap()
477+
.get(&p.name)
478+
.map(|v| v.value != val)
479+
.unwrap_or(true); // changed=true if new
480+
let r = if let Some(ps) = &params_struct_clone {
481+
// Update parameter structure
482+
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
483+
if result.is_ok() {
484+
// Also update Node::params
485+
params
486+
.lock()
487+
.unwrap()
488+
.entry(p.name.clone())
489+
.and_modify(|p| p.value = val.clone());
490+
}
491+
rcl_interfaces::msg::SetParametersResult {
492+
successful: result.is_ok(),
493+
reason: result.err().map_or("".into(), |e| e.to_string()),
494+
}
495+
} else {
496+
// No parameter structure - update only Node::params
497+
params
498+
.lock()
499+
.unwrap()
500+
.entry(p.name.clone())
501+
.and_modify(|p| p.value = val.clone())
502+
.or_insert(Parameter::new(val.clone()));
503+
rcl_interfaces::msg::SetParametersResult {
504+
successful: true,
505+
reason: "".into(),
506+
}
507+
};
508+
// if the value changed, send out new value on parameter event stream
509+
if changed && r.successful {
510+
if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) {
511+
log::debug!("Warning: could not send parameter event ({}).", e);
512+
}
513+
}
514+
// if this parameter failed, set the result and break
515+
if !r.successful {
516+
result.result = r;
517+
break;
518+
}
519+
}
520+
req.respond(result)
521+
.expect("could not send reply to set parameter request");
522+
future::ready(())
523+
},
524+
);
525+
handlers.push(Box::pin(set_params_atomically_future));
526+
452527
#[cfg(r2r__rosgraph_msgs__msg__Clock)]
453528
{
454529
// create TimeSource based on value of use_sim_time parameter
@@ -909,7 +984,9 @@ impl Node {
909984
pub fn destroy_publisher<T: WrappedTypesupport>(&mut self, p: Publisher<T>) {
910985
if let Some(handle) = p.handle.upgrade() {
911986
// Remove handle from list of publishers.
912-
self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle))
987+
self.pubs
988+
.iter()
989+
.position(|p| Arc::ptr_eq(p, &handle))
913990
.map(|i| self.pubs.swap_remove(i));
914991

915992
let handle = wait_until_unwrapped(handle);
@@ -921,7 +998,9 @@ impl Node {
921998
pub fn destroy_publisher_untyped(&mut self, p: PublisherUntyped) {
922999
if let Some(handle) = p.handle.upgrade() {
9231000
// Remove handle from list of publishers.
924-
self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle))
1001+
self.pubs
1002+
.iter()
1003+
.position(|p| Arc::ptr_eq(p, &handle))
9251004
.map(|i| self.pubs.swap_remove(i));
9261005

9271006
let handle = wait_until_unwrapped(handle);

0 commit comments

Comments
 (0)