Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions apps/skit-cli/src/load_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::load_test::scenarios::{run_dynamic_scenario, run_mixed_scenario, run_
pub async fn run_load_test(
config_path: &str,
server_override: Option<String>,
sessions_override: Option<usize>,
duration_override: Option<u64>,
cleanup: bool,
) -> Result<()> {
Expand All @@ -43,13 +44,29 @@ pub async fn run_load_test(
if let Some(duration) = duration_override {
config.test.duration_secs = duration;
}
if let Some(sessions) = sessions_override {
if sessions == 0 {
anyhow::bail!("--sessions must be > 0");
}
match config.test.scenario {
Scenario::Dynamic | Scenario::Mixed => {
config.dynamic.session_count = sessions;
},
Scenario::OneShot => {
warn!("Ignoring --sessions because test.scenario=oneshot");
},
}
}

config.validate()?;

info!("Load test configuration loaded from: {}", config_path);
info!("Server: {}", config.server.url);
info!("Scenario: {:?}", config.test.scenario);
info!("Duration: {}s", config.test.duration_secs);
if matches!(config.test.scenario, Scenario::Dynamic | Scenario::Mixed) {
info!("Dynamic sessions: {}", config.dynamic.session_count);
}

// Set up graceful shutdown handler
let shutdown_token = tokio_util::sync::CancellationToken::new();
Expand Down
9 changes: 7 additions & 2 deletions apps/skit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ enum Commands {
/// Override server URL from config
#[arg(long)]
server: Option<String>,
/// Override dynamic.session_count from config
///
/// Useful for quickly scaling down presets like `stress-dynamic` on laptops.
#[arg(long)]
sessions: Option<usize>,
/// Override test duration (seconds)
#[arg(short, long)]
duration: Option<u64>,
Expand Down Expand Up @@ -382,7 +387,7 @@ async fn main() {
std::process::exit(1);
}
},
Commands::LoadTest { config_path, config, server, duration, cleanup } => {
Commands::LoadTest { config_path, config, server, sessions, duration, cleanup } => {
info!("Starting StreamKit load test");

let config = match (config_path, config) {
Expand All @@ -397,7 +402,7 @@ async fn main() {
};

if let Err(e) =
streamkit_client::run_load_test(&config, server, duration, cleanup).await
streamkit_client::run_load_test(&config, server, sessions, duration, cleanup).await
{
// Error already logged via tracing above
error!(error = %e, "Load test failed");
Expand Down
18 changes: 17 additions & 1 deletion apps/skit-cli/src/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ impl Shell {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if args.is_empty() {
eprintln!(
"Usage: loadtest <config.toml> [--server <url>] [--duration <seconds>] [--cleanup]"
"Usage: loadtest <config.toml> [--server <url>] [--sessions <n>] [--duration <seconds>] [--cleanup]"
);
eprintln!("Example: loadtest samples/loadtest/stress-moq-peer.toml --duration 30");
return Ok(());
Expand All @@ -654,6 +654,7 @@ impl Shell {
}

let mut server_override = None;
let mut sessions_override = None;
let mut duration_override = None;
let mut cleanup = false;

Expand Down Expand Up @@ -684,6 +685,20 @@ impl Shell {
return Ok(());
}
},
"--sessions" => {
if i + 1 < args.len() {
if let Ok(sessions) = args[i + 1].parse::<usize>() {
sessions_override = Some(sessions);
i += 2;
} else {
eprintln!("--sessions requires a numeric value");
return Ok(());
}
} else {
eprintln!("--sessions requires a value");
return Ok(());
}
},
"--cleanup" => {
cleanup = true;
i += 1;
Expand All @@ -701,6 +716,7 @@ impl Shell {
match crate::load_test::run_load_test(
config_path,
server_override,
sessions_override,
duration_override,
cleanup,
)
Expand Down
37 changes: 29 additions & 8 deletions crates/engine/src/dynamic_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,22 @@ impl DynamicEngine {

// Broadcast to all subscribers
self.state_subscribers.retain(|subscriber| {
// If send fails, the subscriber has disconnected, so we remove it
subscriber.try_send(update.clone()).is_ok()
// Keep subscribers on transient backpressure (Full); remove only when Closed.
//
// For state updates we also try to deliver eventually: dropping a state transition
// (e.g. Running -> Recovering) can leave clients showing a stale "healthy" status.
match subscriber.try_send(update.clone()) {
Ok(()) => true,
Err(mpsc::error::TrySendError::Full(_)) => {
let subscriber = subscriber.clone();
let update = update.clone();
tokio::spawn(async move {
let _ = subscriber.send(update).await;
});
true
},
Err(mpsc::error::TrySendError::Closed(_)) => false,
}
});
}

Expand Down Expand Up @@ -362,8 +376,13 @@ impl DynamicEngine {

// Broadcast to all subscribers
self.stats_subscribers.retain(|subscriber| {
// If send fails, the subscriber has disconnected, so we remove it
subscriber.try_send(update.clone()).is_ok()
// Keep subscribers on transient backpressure (Full); remove only when Closed.
//
// Stats are high-frequency, best-effort updates; dropping an update is acceptable.
match subscriber.try_send(update.clone()) {
Ok(()) | Err(mpsc::error::TrySendError::Full(_)) => true,
Err(mpsc::error::TrySendError::Closed(_)) => false,
}
});
}

Expand Down Expand Up @@ -463,10 +482,12 @@ impl DynamicEngine {
};

// 5. Spawn Node
let task_handle =
tokio::spawn(node.run(context).instrument(
tracing::info_span!("node_run", node.name = %node_id, node.kind = %kind),
));
let task_handle = tokio::spawn(node.run(context).instrument(tracing::info_span!(
"node_run",
session.id = %self.session_id.as_deref().unwrap_or("<unknown>"),
node.name = %node_id,
node.kind = %kind
)));
self.live_nodes
.insert(node_id.to_string(), graph_builder::LiveNode { control_tx, task_handle });
self.nodes_active_gauge.record(self.live_nodes.len() as u64, &[]);
Expand Down
26 changes: 24 additions & 2 deletions crates/nodes/src/transport/moq/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ impl MoqPullNode {
let mut current_group: Option<moq_lite::GroupConsumer> = None;

let mut session_packet_count: u32 = 0;
let mut consecutive_cancels: u32 = 0;
let mut last_payload_at = tokio::time::Instant::now();

// Stats tracking
let node_name = context.output_sender.node_name().to_string();
Expand Down Expand Up @@ -601,6 +603,8 @@ impl MoqPullNode {

match read_result {
Ok(Some(first_payload)) => {
consecutive_cancels = 0;
last_payload_at = tokio::time::Instant::now();
// Batching is disabled by default (batch_ms=0).
if self.config.batch_ms > 0 {
let mut batch = Vec::with_capacity(context.batch_size);
Expand Down Expand Up @@ -719,12 +723,30 @@ impl MoqPullNode {
return Ok(StreamEndReason::Natural);
},
Err(moq_lite::Error::Cancel) => {
// moq_lite cancels groups when the producer advances and drops old groups.
// This is expected with our "latest group" semantics under load: skip to the
// next group rather than tearing down the entire WebTransport connection.
consecutive_cancels = consecutive_cancels.saturating_add(1);
tracing::debug!(
session_packet_count,
total_packet_count = *total_packet_count,
"Track read cancelled"
consecutive_cancels,
"Track read cancelled (skipping to next group)"
);
return Ok(StreamEndReason::Reconnect);

// Safety valve: if we see cancels for too long with no payloads, reconnect.
if last_payload_at.elapsed() > Duration::from_secs(5)
&& consecutive_cancels >= 50
{
tracing::warn!(
session_packet_count,
total_packet_count = *total_packet_count,
consecutive_cancels,
elapsed_ms = last_payload_at.elapsed().as_millis(),
"Excessive track cancels without payloads; reconnecting"
);
return Ok(StreamEndReason::Reconnect);
}
},
Err(e) => {
tracing::error!(error = %e, session_packet_count, "Error reading from track");
Expand Down
1 change: 1 addition & 0 deletions docs/src/content/docs/guides/load-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Examples:

- `just lt stress-oneshot`
- `just lt oneshot-opus-transcode-fast`
- `just lt stress-dynamic sessions=10` (or `just lt stress-dynamic --sessions 10`)
- `just lt dynamic-tune-heavy --cleanup`

### Oneshot (HTTP batch pipelines)
Expand Down
18 changes: 17 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ skit-lt config='loadtest.toml' *args='':
# Examples:
# - `just lt` # runs `samples/loadtest/stress-oneshot.toml` by default
# - `just lt stress-dynamic` # runs `samples/loadtest/stress-dynamic.toml`
# - `just lt stress-dynamic sessions=10` # shorthand for `--sessions 10`
# - `just lt stress-dynamic --sessions 10`
# - `just lt dynamic-tune-heavy --cleanup`
# - `just lt samples/loadtest/ui-demo.toml`
lt preset_or_path='stress-oneshot' *args='':
Expand All @@ -118,7 +120,21 @@ lt preset_or_path='stress-oneshot' *args='':
echo " - If passing a path, ensure the file exists"; \
exit 1; \
fi; \
just skit-lt "$cfg" {{args}}
sessions=""; \
set -- {{args}}; \
if [ $# -ge 1 ]; then \
case "$1" in \
sessions=*) sessions="${1#sessions=}"; shift;; \
[0-9]*) sessions="$1"; shift;; \
esac; \
fi; \
if [ -n "$sessions" ]; then \
case "$sessions" in \
''|*[!0-9]*) echo "❌ sessions must be an integer (got: '$sessions')"; exit 1;; \
esac; \
set -- --sessions "$sessions" "$@"; \
fi; \
just skit-lt "$cfg" "$@"

# --- Load test presets ---
# Run the standard oneshot stress test config
Expand Down
17 changes: 12 additions & 5 deletions ui/src/components/NodeStateIndicator.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,22 @@ export const NodeStateIndicator: React.FC<NodeStateIndicatorProps> = ({
nodeId,
sessionId,
}) => {
// Get live stats for error badge display
const [isTooltipOpen, setIsTooltipOpen] = React.useState(false);

// IMPORTANT: avoid subscribing to node stats while the tooltip is closed.
// Stats are high-frequency and would otherwise cause constant re-renders of all node indicators.
const liveStats = useSessionStore(
React.useCallback(
(s) => (nodeId && sessionId ? s.sessions.get(sessionId)?.nodeStats[nodeId] : undefined),
[nodeId, sessionId]
(s) =>
isTooltipOpen && nodeId && sessionId
? s.sessions.get(sessionId)?.nodeStats[nodeId]
: undefined,
[isTooltipOpen, nodeId, sessionId]
)
);

const stats = liveStats ?? propStats;
const hasErrors = stats && stats.errored > 0;
const hasErrors = isTooltipOpen && stats && stats.errored > 0;

const color = getStateColor(state);
const label = getStateLabel(state);
Expand All @@ -439,7 +446,7 @@ export const NodeStateIndicator: React.FC<NodeStateIndicatorProps> = ({
);

return (
<SKTooltip content={content} side="top">
<SKTooltip content={content} side="top" onOpenChange={setIsTooltipOpen}>
<div
className="nodrag"
style={{ display: 'flex', alignItems: 'center', gap: 6, cursor: 'help' }}
Expand Down
4 changes: 2 additions & 2 deletions ui/src/services/websocket.reconnection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,12 @@ describe('WebSocketService reconnection', () => {
expect(session?.isConnected).toBe(true);
});

it('should unsubscribe from session and clear it', () => {
it('should unsubscribe from session and keep cached session', () => {
service.subscribeToSession('session-1');
service.unsubscribeFromSession('session-1');

const session = useSessionStore.getState().getSession('session-1');
expect(session).toBeUndefined();
expect(session?.isConnected).toBe(false);
});

it('should update all subscribed sessions on connection status change', () => {
Expand Down
7 changes: 6 additions & 1 deletion ui/src/services/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ export class WebSocketService {
}

private handleSessionDestroyed(payload: SessionDestroyedPayload): void {
this.subscribedSessions.delete(payload.session_id);
useSessionStore.getState().clearSession(payload.session_id);
useNodeParamsStore.getState().resetSession(payload.session_id);
useTelemetryStore.getState().clearSession(payload.session_id);
}

Expand Down Expand Up @@ -350,7 +353,9 @@ export class WebSocketService {

unsubscribeFromSession(sessionId: string): void {
this.subscribedSessions.delete(sessionId);
useSessionStore.getState().clearSession(sessionId);
// Keep the session entry so the Monitor session list can display the latest known status
// even when a session is not actively selected/subscribed.
useSessionStore.getState().setConnected(sessionId, false);
useNodeParamsStore.getState().resetSession(sessionId);
}

Expand Down
Loading
Loading