Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4f5a128
Contribute Beaubourg to OTEL
lquerel Feb 22, 2025
f0bf6a4
Merge branch 'main' into main
lquerel Feb 24, 2025
39d3b2f
Merge branch 'open-telemetry:main' into main
lquerel Mar 3, 2025
94c1c2f
Merge branch 'open-telemetry:main' into main
lquerel Apr 10, 2025
a6cc070
Merge branch 'open-telemetry:main' into main
lquerel Apr 16, 2025
2b1c910
Merge branch 'open-telemetry:main' into main
lquerel Apr 18, 2025
4646aff
Merge branch 'open-telemetry:main' into main
lquerel Jun 2, 2025
15910da
Merge branch 'open-telemetry:main' into main
lquerel Sep 3, 2025
c721952
Merge branch 'open-telemetry:main' into monitor-grpc-channel
lquerel Oct 27, 2025
dedbffb
Merge branch 'open-telemetry:main' into monitor-grpc-channel
lquerel Oct 27, 2025
006e6f4
Improve gRPC endpoint/channel configuration for better performance
lquerel Oct 28, 2025
b335a4c
OtlpProtoBytes Vec<u8> to bytes::Bytes to remove Vec::clone
lquerel Oct 29, 2025
a68df7a
Send multiple export requests in parallel
lquerel Oct 29, 2025
6c696e9
Remove LocalBoxFuture and replace it with our own implementation (no …
lquerel Oct 29, 2025
6857f0c
Tune otlp_receiver TCP connection configuration
lquerel Oct 29, 2025
5fd506d
Tune otlp_receiver server configuration
lquerel Oct 29, 2025
ae99402
Tune otlp_receiver server configuration
lquerel Oct 29, 2025
db9a464
Tune otlp_receiver server configuration
lquerel Oct 29, 2025
05ea108
Merge branch 'main' into monitor-grpc-channel
lquerel Oct 29, 2025
625d8f1
Fix fmt issues
lquerel Oct 29, 2025
01a9722
Make max_concurrent_requests adaptive
lquerel Oct 29, 2025
da6b6d5
Update the OTLP receiver config to support all the knob parameters (w…
lquerel Oct 30, 2025
49d1587
Merge branch 'main' into monitor-grpc-channel
lquerel Oct 30, 2025
42f0f0c
Change OTLP port
lquerel Oct 30, 2025
747e707
Merge remote-tracking branch 'origin/monitor-grpc-channel' into monit…
lquerel Oct 30, 2025
744abff
Fix documentation not aligned with the code
lquerel Oct 30, 2025
a1d77f4
Introduce max_decoding_message_size
lquerel Oct 30, 2025
133c2cb
Support multiple compression methods
lquerel Oct 30, 2025
1bce687
Refactor OTLP and OTAP receivers to share configuration
lquerel Oct 31, 2025
71f62b3
Refactor OTLP and OTAP receivers to share code
lquerel Oct 31, 2025
c07f2b6
Refactor OTLP and OTAP receivers to share code
lquerel Oct 31, 2025
7e506cf
Replace task and channel in the otap_receiver by an async stream appr…
lquerel Oct 31, 2025
528948a
Replace std mutex with a parking lot mutex.
lquerel Oct 31, 2025
f00650f
Reduce cost of the EffectHandler clone
lquerel Oct 31, 2025
395665c
Reduce cost of the EffectHandler clones
lquerel Oct 31, 2025
27fded6
Remove await points in the otap_exporter
lquerel Oct 31, 2025
c1ac20c
Improve metrics update by aggregating them locally when try_send fail
lquerel Oct 31, 2025
98ad7b2
Create specialized stream to remove Arc<Mutex<_>>
lquerel Oct 31, 2025
33f0337
Create specialized stream to remove Arc<Mutex<_>>
lquerel Oct 31, 2025
00bdb1c
Refactor the OTLP exporter to reuse gRPC clients more efficiently
lquerel Nov 3, 2025
9b4d55b
Document the otlp exporter and imrpove naming
lquerel Nov 3, 2025
b107d58
Refactor shared state
lquerel Nov 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git",
weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"}
weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"}
zip = "=4.2.0"
byte-unit = "5.1.6"

[features]
default = []
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/configs/otlp-perf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ nodes:
- perf
dispatch_strategy: round_robin
config:
listening_addr: "127.0.0.1:4318"
listening_addr: "127.0.0.1:4317"
perf:
kind: exporter
plugin_urn: "urn:otel:otap:perf:exporter"
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ serde_yaml = { workspace = true }
miette = { workspace = true }
urn = { workspace = true }
schemars = { workspace = true }
byte-unit = { workspace = true }
126 changes: 126 additions & 0 deletions rust/otap-dataflow/crates/config/src/byte_units.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Support for byte units like "KB", "MB", "GB" in configuration.

use byte_unit::Byte;
use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer};

#[derive(Deserialize)]
#[serde(untagged)]
enum Value {
Number(u64),
String(String),
}

/// Deserialize an optional byte size that can be specified either as a number (in bytes)
/// or as a string with units (e.g., "1 KB", "2 MiB").
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<u32>, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let Some(value) = value else {
return Ok(None);
};

let (bytes, repr) = match value {
Value::Number(value) => (value as u128, value.to_string()),
Value::String(text) => {
let parsed: Byte = text.parse().map_err(DeError::custom)?;
(parsed.as_u64() as u128, text)
}
};

if bytes > u32::MAX as u128 {
return Err(DeError::custom(format!(
"byte size {} exceeds u32::MAX ({} bytes)",
repr,
u32::MAX
)));
}

Ok(Some(bytes as u32))
}

#[cfg(test)]
mod tests {
use super::deserialize;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Holder {
#[serde(default, deserialize_with = "deserialize")]
value: Option<u32>,
}

fn de_yaml(input: &str) -> Result<Holder, serde_yaml::Error> {
serde_yaml::from_str::<Holder>(input)
}

#[test]
fn parses_number_as_bytes() {
let cfg = de_yaml("value: 1024").expect("should parse numeric bytes");
assert_eq!(cfg.value, Some(1024));
}

#[test]
fn parses_string_with_iec_units() {
// 1 KiB == 1024 bytes
let cfg = de_yaml("value: 1 KiB").expect("should parse 1 KiB");
assert_eq!(cfg.value, Some(1024));

// 2 MiB == 2 * 1024 * 1024 bytes
let cfg = de_yaml("value: '2 MiB'").expect("should parse 2 MiB");
assert_eq!(cfg.value, Some(2 * 1024 * 1024));
}

#[test]
fn parses_plain_string_number() {
let cfg = de_yaml("value: '2048'").expect("should parse plain numeric string");
assert_eq!(cfg.value, Some(2048));
}

#[test]
fn missing_value_is_none() {
let cfg = de_yaml("{}").expect("should parse with missing field as None");
assert_eq!(cfg.value, None);
}

#[test]
fn overflow_is_rejected() {
// 4 GiB == 4 * 1024^3 bytes = 4_294_967_296 > u32::MAX (4_294_967_295)
let err = de_yaml("value: 4 GiB").expect_err("should error for overflow");
let msg = err.to_string();
assert!(
msg.contains("exceeds u32::MAX"),
"unexpected error: {}",
msg
);
}

#[test]
fn parses_no_space_decimal_units() {
let cfg = de_yaml("value: 1KB").expect("should parse 1KB without space");
assert_eq!(cfg.value, Some(1000));

let cfg = de_yaml("value: 10MB").expect("should parse 10MB without space");
assert_eq!(cfg.value, Some(10_000_000));

// Lowercase 'b' should still be treated as bytes per crate behavior
let cfg = de_yaml("value: 1kb").expect("should parse 1kb as 1000 bits => 125 bytes");
assert_eq!(cfg.value, Some(125));
}

#[test]
fn parses_fractional_values_and_rounding() {
// Decimal unit with fraction
let cfg = de_yaml("value: '1.5 MB'").expect("should parse 1.5 MB");
assert_eq!(cfg.value, Some(1_500_000));

// Binary unit with fraction (exact)
let cfg = de_yaml("value: '0.5 KiB'").expect("should parse 0.5 KiB to 512 bytes");
assert_eq!(cfg.value, Some(512));
}
}
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! A pipeline is a collection of nodes interconnected in a directed acyclic graph (DAG).

use std::borrow::Cow;
pub mod byte_units;
pub mod engine;
pub mod error;
pub mod experimental;
Expand Down
17 changes: 11 additions & 6 deletions rust/otap-dataflow/crates/engine/src/local/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use otap_df_telemetry::error::Error as TelemetryError;
use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler};
use otap_df_telemetry::reporter::MetricsReporter;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

/// A trait for processors in the pipeline (!Send definition).
Expand Down Expand Up @@ -91,7 +92,10 @@ pub trait Processor<PData> {
#[derive(Clone)]
pub struct EffectHandler<PData> {
pub(crate) core: EffectHandlerCore<PData>,
senders: Arc<EffectHandlerSenders<PData>>,
}

struct EffectHandlerSenders<PData> {
/// A sender used to forward messages from the processor.
/// Supports multiple named output ports.
msg_senders: HashMap<PortName, LocalSender<PData>>,
Expand Down Expand Up @@ -120,11 +124,12 @@ impl<PData> EffectHandler<PData> {
None
};

EffectHandler {
core,
let senders = Arc::new(EffectHandlerSenders {
msg_senders,
default_sender,
}
});

EffectHandler { core, senders }
}

/// Returns the id of the processor associated with this handler.
Expand All @@ -136,7 +141,7 @@ impl<PData> EffectHandler<PData> {
/// Returns the list of connected out ports for this processor.
#[must_use]
pub fn connected_ports(&self) -> Vec<PortName> {
self.msg_senders.keys().cloned().collect()
self.senders.msg_senders.keys().cloned().collect()
}

/// Sends a message to the next node(s) in the pipeline using the default port.
Expand All @@ -150,7 +155,7 @@ impl<PData> EffectHandler<PData> {
/// if the default port is not configured.
#[inline]
pub async fn send_message(&self, data: PData) -> Result<(), TypedError<PData>> {
match &self.default_sender {
match &self.senders.default_sender {
Some(sender) => sender
.send(data)
.await
Expand Down Expand Up @@ -178,7 +183,7 @@ impl<PData> EffectHandler<PData> {
P: Into<PortName>,
{
let port_name: PortName = port.into();
match self.msg_senders.get(&port_name) {
match self.senders.msg_senders.get(&port_name) {
Some(sender) => sender
.send(data)
.await
Expand Down
17 changes: 11 additions & 6 deletions rust/otap-dataflow/crates/engine/src/local/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler};
use otap_df_telemetry::reporter::MetricsReporter;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, UdpSocket};

Expand Down Expand Up @@ -124,7 +125,10 @@ impl<PData> ControlChannel<PData> {
#[derive(Clone)]
pub struct EffectHandler<PData> {
core: EffectHandlerCore<PData>,
senders: Arc<EffectHandlerSenders<PData>>,
}

struct EffectHandlerSenders<PData> {
/// A sender used to forward messages from the receiver.
/// Supports multiple named output ports.
msg_senders: HashMap<PortName, LocalSender<PData>>,
Expand Down Expand Up @@ -155,11 +159,12 @@ impl<PData> EffectHandler<PData> {
None
};

EffectHandler {
core,
let senders = Arc::new(EffectHandlerSenders {
msg_senders,
default_sender,
}
});

EffectHandler { core, senders }
}

/// Returns the id of the receiver associated with this handler.
Expand All @@ -171,7 +176,7 @@ impl<PData> EffectHandler<PData> {
/// Returns the list of connected out ports for this receiver.
#[must_use]
pub fn connected_ports(&self) -> Vec<PortName> {
self.msg_senders.keys().cloned().collect()
self.senders.msg_senders.keys().cloned().collect()
}

/// Sends a message to the next node(s) in the pipeline using the default port.
Expand All @@ -185,7 +190,7 @@ impl<PData> EffectHandler<PData> {
/// [`TypedError::Error::ReceiverError`] if the default port is not configured.
#[inline]
pub async fn send_message(&self, data: PData) -> Result<(), TypedError<PData>> {
match &self.default_sender {
match &self.senders.default_sender {
Some(sender) => sender
.send(data)
.await
Expand Down Expand Up @@ -213,7 +218,7 @@ impl<PData> EffectHandler<PData> {
P: Into<PortName>,
{
let port_name: PortName = port.into();
match self.msg_senders.get(&port_name) {
match self.senders.msg_senders.get(&port_name) {
Some(sender) => sender
.send(data)
.await
Expand Down
17 changes: 11 additions & 6 deletions rust/otap-dataflow/crates/engine/src/shared/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use otap_df_telemetry::error::Error as TelemetryError;
use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler};
use otap_df_telemetry::reporter::MetricsReporter;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

/// A trait for processors in the pipeline (Send definition).
Expand Down Expand Up @@ -90,7 +91,10 @@ pub trait Processor<PData> {
#[derive(Clone)]
pub struct EffectHandler<PData> {
pub(crate) core: EffectHandlerCore<PData>,
senders: Arc<EffectHandlerSenders<PData>>,
}

struct EffectHandlerSenders<PData> {
/// A sender used to forward messages from the processor.
/// Supports multiple named output ports.
msg_senders: HashMap<PortName, SharedSender<PData>>,
Expand Down Expand Up @@ -119,11 +123,12 @@ impl<PData> EffectHandler<PData> {
None
};

EffectHandler {
core,
let senders = Arc::new(EffectHandlerSenders {
msg_senders,
default_sender,
}
});

EffectHandler { core, senders }
}

/// Returns the id of the processor associated with this handler.
Expand All @@ -135,7 +140,7 @@ impl<PData> EffectHandler<PData> {
/// Returns the list of connected out ports for this processor.
#[must_use]
pub fn connected_ports(&self) -> Vec<PortName> {
self.msg_senders.keys().cloned().collect()
self.senders.msg_senders.keys().cloned().collect()
}

/// Sends a message to the next node(s) in the pipeline.
Expand All @@ -145,7 +150,7 @@ impl<PData> EffectHandler<PData> {
/// Returns an [`Error::ProcessorError`] if the message could not be routed to a port.
#[inline]
pub async fn send_message(&self, data: PData) -> Result<(), TypedError<PData>> {
match &self.default_sender {
match &self.senders.default_sender {
Some(sender) => sender
.send(data)
.await
Expand All @@ -168,7 +173,7 @@ impl<PData> EffectHandler<PData> {
P: Into<PortName>,
{
let port_name: PortName = port.into();
match self.msg_senders.get(&port_name) {
match self.senders.msg_senders.get(&port_name) {
Some(sender) => sender
.send(data)
.await
Expand Down
Loading