Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit fcd5af8

Browse files
committed
feat(naxum): add builder, fair scheduler, integration tests
1 parent 9bb82c9 commit fcd5af8

12 files changed

Lines changed: 2058 additions & 10 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/naxum/BUCK

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("@prelude-si//:macros.bzl", "rust_library")
1+
load("@prelude-si//:macros.bzl", "rust_library", "rust_test")
22

33
rust_library(
44
name = "naxum",
@@ -12,11 +12,35 @@ rust_library(
1212
"//third-party/rust:serde",
1313
"//third-party/rust:serde_json",
1414
"//third-party/rust:serde_path_to_error",
15+
"//third-party/rust:thiserror",
1516
"//third-party/rust:time",
1617
"//third-party/rust:tokio",
1718
"//third-party/rust:tokio-util",
1819
"//third-party/rust:tower",
1920
"//third-party/rust:tracing",
2021
],
2122
srcs = glob(["src/**/*.rs"]),
23+
extra_test_targets = [":test-integration"],
24+
)
25+
26+
rust_test(
27+
name = "test-integration",
28+
deps = [
29+
"//lib/si-data-nats:si-data-nats",
30+
"//third-party/rust:test-log",
31+
"//third-party/rust:tokio",
32+
"//third-party/rust:tokio-util",
33+
"//third-party/rust:tracing-subscriber",
34+
"//third-party/rust:uuid",
35+
":naxum",
36+
],
37+
srcs = glob([
38+
"tests/**/*.rs",
39+
]),
40+
crate_root = "tests/integration.rs",
41+
env = {
42+
"CARGO_PKG_NAME": "integration",
43+
"RUSTC_BOOTSTRAP": "1",
44+
"CI": "buildkite",
45+
},
2246
)

lib/naxum/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,19 @@ async-nats = { workspace = true }
1313
async-trait = { workspace = true }
1414
bytes = { workspace = true }
1515
futures = { workspace = true } # NOTE: if extracted this can be `futures-util`
16-
pin-project-lite = { workspace = true }
1716
serde = { workspace = true } # NOTE: if extracted, this is used for a json feature
1817
serde_json = { workspace = true } # NOTE: if extracted, this is used for a json feature
1918
serde_path_to_error = { workspace = true } # NOTE: if extracted, this is used for a json feature
2019
telemetry-utils = { path = "../../lib/telemetry-utils-rs" }
20+
thiserror = { workspace = true }
2121
time = { workspace = true }
2222
tokio = { workspace = true }
2323
tokio-util = { workspace = true }
2424
tower = { workspace = true }
2525
tracing = { workspace = true } # NOTE: left with vanilla tracing for potential future extraction
2626

2727
[dev-dependencies]
28-
thiserror = { workspace = true }
28+
si-data-nats = { path = "../si-data-nats" }
29+
test-log = { workspace = true }
2930
tracing-subscriber = { workspace = true }
31+
uuid = { workspace = true }

lib/naxum/src/fair/config.rs

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
use std::{
2+
fmt,
3+
marker::PhantomData,
4+
time::Duration,
5+
};
6+
7+
use async_nats::jetstream::stream::Stream as JsStream;
8+
9+
pub type KeyExtractorFn<K> = fn(&str, Option<&str>, &str) -> Option<K>;
10+
pub type ConsumerNameFn<K> = fn(&K, &str) -> String;
11+
pub type ConsumerFilterSubjectFn<K> = fn(Option<&str>, &K, &str) -> String;
12+
13+
pub struct FairSchedulingConfig<K> {
14+
pub(crate) service_name: String,
15+
pub(crate) tasks_stream: JsStream,
16+
pub(crate) requests_stream: JsStream,
17+
pub(crate) subject_prefix: Option<String>,
18+
pub(crate) key_extractor: KeyExtractorFn<K>,
19+
pub(crate) consumer_name_fn: ConsumerNameFn<K>,
20+
pub(crate) consumer_filter_subject_fn: ConsumerFilterSubjectFn<K>,
21+
pub(crate) inactive_threshold: Duration,
22+
pub(crate) task_listener_name: String,
23+
pub(crate) tasks_filter_subject: String,
24+
_key_marker: PhantomData<K>,
25+
}
26+
27+
impl<K> fmt::Debug for FairSchedulingConfig<K> {
28+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29+
f.debug_struct("FairSchedulingConfig")
30+
.field("service_name", &self.service_name)
31+
.field("subject_prefix", &self.subject_prefix)
32+
.field("inactive_threshold", &self.inactive_threshold)
33+
.field("task_listener_name", &self.task_listener_name)
34+
.field("tasks_filter_subject", &self.tasks_filter_subject)
35+
.finish_non_exhaustive()
36+
}
37+
}
38+
39+
impl FairSchedulingConfig<String> {
40+
/// Creates a config for workspace-partitioned fair scheduling.
41+
///
42+
/// Uses standard subject conventions where workspace_id is extracted from:
43+
/// - Tasks: `{prefix?}.{service}.tasks.{workspace_id}[.{rest}]`
44+
/// - Requests: `{prefix?}.{service}.requests.{workspace_id}.>`
45+
///
46+
/// # Example
47+
///
48+
/// ```ignore
49+
/// let config = FairSchedulingConfig::for_workspace_partitioning(
50+
/// "veritech",
51+
/// tasks_stream,
52+
/// requests_stream,
53+
/// prefix,
54+
/// );
55+
/// ```
56+
pub fn for_workspace_partitioning(
57+
service_name: impl Into<String>,
58+
tasks_stream: JsStream,
59+
requests_stream: JsStream,
60+
subject_prefix: Option<String>,
61+
) -> Self {
62+
let service_name = service_name.into();
63+
64+
let tasks_filter_subject =
65+
with_optional_prefix(subject_prefix.as_deref(), format!("{service_name}.tasks.*"));
66+
67+
Self {
68+
service_name: service_name.clone(),
69+
tasks_stream,
70+
requests_stream,
71+
subject_prefix,
72+
key_extractor: default_workspace_key_extractor,
73+
consumer_name_fn: default_workspace_consumer_name,
74+
consumer_filter_subject_fn: default_workspace_filter_subject,
75+
inactive_threshold: Duration::from_secs(300),
76+
task_listener_name: format!("{service_name}-task-listener"),
77+
tasks_filter_subject,
78+
_key_marker: PhantomData,
79+
}
80+
}
81+
}
82+
83+
fn with_optional_prefix(prefix: Option<&str>, base: impl AsRef<str>) -> String {
84+
match prefix {
85+
Some(p) => format!("{}.{}", p, base.as_ref()),
86+
None => base.as_ref().to_string(),
87+
}
88+
}
89+
90+
/// Extracts workspace_id from position 2 (no prefix) or 3 (with prefix).
91+
///
92+
/// Works for subjects like: `{prefix?}.{service}.tasks.{workspace_id}[.{rest}]`
93+
fn default_workspace_key_extractor(
94+
subject: &str,
95+
prefix: Option<&str>,
96+
_service_name: &str,
97+
) -> Option<String> {
98+
let parts: Vec<&str> = subject.split('.').collect();
99+
let idx = if prefix.is_some() { 3 } else { 2 };
100+
parts.get(idx).map(|s| s.to_string())
101+
}
102+
103+
/// Generates consumer name: `{service}-ws-{workspace_id}`
104+
fn default_workspace_consumer_name(workspace_id: &String, service_name: &str) -> String {
105+
format!("{service_name}-ws-{workspace_id}")
106+
}
107+
108+
/// Generates filter subject: `{prefix?}.{service}.requests.{workspace_id}.>`
109+
fn default_workspace_filter_subject(
110+
prefix: Option<&str>,
111+
workspace_id: &String,
112+
service_name: &str,
113+
) -> String {
114+
with_optional_prefix(prefix, format!("{service_name}.requests.{workspace_id}.>"))
115+
}
116+
117+
#[cfg(test)]
118+
mod tests {
119+
use super::{
120+
default_workspace_consumer_name,
121+
default_workspace_filter_subject,
122+
default_workspace_key_extractor,
123+
};
124+
125+
#[test]
126+
fn test_workspace_key_extractor_no_prefix() {
127+
// Subject format: {service}.tasks.{workspace_id}
128+
let subject = "veritech.tasks.workspace-123";
129+
let result = default_workspace_key_extractor(subject, None, "veritech");
130+
131+
assert_eq!(result, Some("workspace-123".to_string()));
132+
}
133+
134+
#[test]
135+
fn test_workspace_key_extractor_with_prefix() {
136+
// Subject format: {prefix}.{service}.tasks.{workspace_id}
137+
let subject = "si.veritech.tasks.workspace-456";
138+
let result = default_workspace_key_extractor(subject, Some("si"), "veritech");
139+
140+
assert_eq!(result, Some("workspace-456".to_string()));
141+
}
142+
143+
#[test]
144+
fn test_workspace_key_extractor_with_trailing_parts() {
145+
// Subject format can have additional parts after workspace_id
146+
let subject = "veritech.tasks.workspace-789.extra.parts";
147+
let result = default_workspace_key_extractor(subject, None, "veritech");
148+
149+
assert_eq!(result, Some("workspace-789".to_string()));
150+
}
151+
152+
#[test]
153+
fn test_workspace_key_extractor_invalid_subject() {
154+
// Too few parts
155+
let subject = "veritech.tasks";
156+
let result = default_workspace_key_extractor(subject, None, "veritech");
157+
158+
assert_eq!(result, None);
159+
}
160+
161+
#[test]
162+
fn test_workspace_key_extractor_empty_workspace_id() {
163+
// Empty workspace ID should still be extracted (validation is elsewhere)
164+
let subject = "veritech.tasks..something";
165+
let result = default_workspace_key_extractor(subject, None, "veritech");
166+
167+
assert_eq!(result, Some("".to_string()));
168+
}
169+
170+
#[test]
171+
fn test_consumer_name_generation() {
172+
let workspace_id = "workspace-123";
173+
let service_name = "veritech";
174+
175+
let result = default_workspace_consumer_name(&workspace_id.to_string(), service_name);
176+
177+
assert_eq!(result, "veritech-ws-workspace-123");
178+
}
179+
180+
#[test]
181+
fn test_consumer_name_with_special_characters() {
182+
let workspace_id = "workspace-test_123";
183+
let service_name = "my-service";
184+
185+
let result = default_workspace_consumer_name(&workspace_id.to_string(), service_name);
186+
187+
assert_eq!(result, "my-service-ws-workspace-test_123");
188+
}
189+
190+
#[test]
191+
fn test_filter_subject_no_prefix() {
192+
let workspace_id = "workspace-123";
193+
let service_name = "veritech";
194+
195+
let result =
196+
default_workspace_filter_subject(None, &workspace_id.to_string(), service_name);
197+
198+
assert_eq!(result, "veritech.requests.workspace-123.>");
199+
}
200+
201+
#[test]
202+
fn test_filter_subject_with_prefix() {
203+
let workspace_id = "workspace-456";
204+
let service_name = "veritech";
205+
206+
let result =
207+
default_workspace_filter_subject(Some("si"), &workspace_id.to_string(), service_name);
208+
209+
assert_eq!(result, "si.veritech.requests.workspace-456.>");
210+
}
211+
212+
#[test]
213+
fn test_filter_subject_wildcard_suffix() {
214+
// Verify the wildcard suffix is correctly added
215+
let result = default_workspace_filter_subject(None, &"ws1".to_string(), "svc");
216+
217+
assert!(
218+
result.ends_with(".>"),
219+
"Filter subject must end with wildcard"
220+
);
221+
}
222+
223+
#[test]
224+
fn test_key_extractor_consistency_with_subject_helpers() {
225+
// Verify that the extractor can parse subjects generated by the helpers
226+
let workspace_id = "test-workspace";
227+
228+
// Simulate what task_subject_for_workspace generates
229+
let subject = format!("veritech.tasks.{workspace_id}");
230+
231+
// Should be extractable
232+
let extracted = default_workspace_key_extractor(&subject, None, "veritech");
233+
assert_eq!(extracted, Some(workspace_id.to_string()));
234+
}
235+
}

0 commit comments

Comments
 (0)