Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions rust/otap-dataflow/crates/otap/src/transform_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use otap_df_engine::{
};
use otap_df_pdata::{OtapArrowRecords, OtapPayload};
use otap_df_query_engine::pipeline::Pipeline;
use otap_df_telemetry::metrics::MetricSet;
use otap_df_telemetry::{metrics::MetricSet, otel_debug};
use serde_json::Value;

use crate::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata};
Expand Down Expand Up @@ -173,21 +173,25 @@ impl Processor<OtapPdata> for TransformProcessor {
}
},
Message::PData(pdata) => {
self.metrics.msgs_consumed.inc();
let (context, payload) = pdata.into_parts();
self.metrics.items_received.add(payload.num_items() as u64);
let payload = if !self.should_process(&payload) {
// skip handling this pdata
otel_debug!("transform.skipped");
payload
} else {
let mut otap_batch: OtapArrowRecords = payload.try_into()?;
let num_items = otap_batch.num_items() as u64;
otap_batch.decode_transport_optimized_ids()?;
match self.pipeline.execute(otap_batch).await {
Ok(otap_batch) => {
self.metrics.msgs_transformed.inc();
self.metrics
.items_transformed
.add(otap_batch.num_items() as u64);
otap_batch.into()
}
Err(e) => {
self.metrics.msgs_transform_failed.inc();
self.metrics.items_failed.add(num_items);
return Err(EngineError::ProcessorError {
processor: effect_handler.processor_id(),
kind: ProcessorErrorKind::Other,
Expand All @@ -198,10 +202,11 @@ impl Processor<OtapPdata> for TransformProcessor {
}
};

let num_forwarded = payload.num_items() as u64;
effect_handler
.send_message(OtapPdata::new(context, payload))
.await
.inspect(|_| self.metrics.msgs_forwarded.inc())?;
.inspect(|_| self.metrics.items_sent.add(num_forwarded))?;
}
};

Expand Down Expand Up @@ -347,29 +352,29 @@ mod test {
// Allow the collector to pull from the channel
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

let mut msgs_consumed = 0;
let mut msgs_forwarded = 0;
let mut msgs_transformed = 0;
let mut msgs_transform_failed = 0;
let mut items_received = 0;
let mut items_sent = 0;
let mut items_transformed = 0;
let mut items_failed = 0;
registry.visit_current_metrics(|desc, _attrs, iter| {
if desc.name == "transform.processor.metrics" {
for (field, v) in iter {
let val = v.to_u64_lossy();
match field.name {
"msgs.consumed" => msgs_consumed += val,
"msgs.forwarded" => msgs_forwarded += val,
"msgs.transformed" => msgs_transformed += val,
"msgs.transform.failed" => msgs_transform_failed += val,
"items.received" => items_received += val,
"items.sent" => items_sent += val,
"items.transformed" => items_transformed += val,
"items.failed" => items_failed += val,
_ => {}
}
}
}
});

assert_eq!(msgs_consumed, 1);
assert_eq!(msgs_forwarded, 1);
assert_eq!(msgs_transformed, 1);
assert_eq!(msgs_transform_failed, 0)
assert_eq!(items_received, 2);
assert_eq!(items_sent, 1);
assert_eq!(items_transformed, 1);
assert_eq!(items_failed, 0)
});
}

Expand Down
24 changes: 12 additions & 12 deletions rust/otap-dataflow/crates/otap/src/transform_processor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ use otap_df_telemetry_macros::metric_set;
#[metric_set(name = "transform.processor.metrics")]
#[derive(Debug, Default, Clone)]
pub struct Metrics {
/// PData messages consumed by this processor.
#[metric(unit = "{msg}")]
pub msgs_consumed: Counter<u64>,
/// Telemetry items (log records, spans, or metrics) received by this processor.
#[metric(unit = "{item}")]
pub items_received: Counter<u64>,

/// PData messages forwarded by this processor.
#[metric(unit = "{msg}")]
pub msgs_forwarded: Counter<u64>,
/// Telemetry items sent to the next processor in the pipeline.
#[metric(unit = "{item}")]
pub items_sent: Counter<u64>,

/// Number of messages successfully transformed.
#[metric(unit = "{msg}")]
pub msgs_transformed: Counter<u64>,
/// Telemetry items successfully transformed.
#[metric(unit = "{item}")]
pub items_transformed: Counter<u64>,

/// Number of failed transform attempts.
#[metric(unit = "{msg}")]
pub msgs_transform_failed: Counter<u64>,
/// Telemetry items that failed during transformation.
#[metric(unit = "{item}")]
pub items_failed: Counter<u64>,
}
Loading