diff --git a/Cargo.lock b/Cargo.lock index 516efac4..1c7f0093 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -64,6 +73,15 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "2.11.1" @@ -283,6 +301,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "etcetera" version = "0.8.0" @@ -695,6 +723,14 @@ dependencies = [ "serde_core", ] +[[package]] +name = "integration-test-tasks" +version = "0.1.0" +dependencies = [ + "serde", + "spider-tdl", +] + [[package]] name = "itoa" version = "1.0.18" @@ -808,6 +844,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "md-5" version = "0.10.6" @@ -1148,6 +1193,23 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "rmp" version = "0.8.15" @@ -1340,6 +1402,25 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -1431,10 +1512,19 @@ name = "spider-task-executor" version = "0.1.0" dependencies = [ "anyhow", + "bincode", + "bytes", + "futures-util", "libloading", "rmp-serde", + "serde", + "spider-core", "spider-tdl", "thiserror", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", ] [[package]] @@ -1771,6 +1861,24 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "task-executor-tests" +version = "0.1.0" +dependencies = [ + "bincode", + "bytes", + "futures-util", + "integration-test-tasks", + "rmp-serde", + "serde", + "spider-core", + "spider-task-executor", + "spider-tdl", + "tabled", + "tokio", + "tokio-util", +] + [[package]] name = "tdl-integration" version = "0.1.0" @@ -1812,6 +1920,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tinystr" version = "0.8.3" @@ -1847,6 +1964,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", @@ -1918,6 +2036,35 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "once_cell", + "regex-automata", + "serde", + "serde_json", + "sharded-slab", + "thread_local", + "tracing", + "tracing-core", + "tracing-serde", ] [[package]] @@ -1995,6 +2142,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 30796143..67362f87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,5 +9,7 @@ members = [ "components/spider-tdl-derive", "examples/huntsman/complex/tasks", "examples/huntsman/complex/types", + "tests/huntsman/integration-test-tasks", + "tests/huntsman/task-executor", "tests/huntsman/tdl-integration", ] diff --git a/components/spider-task-executor/Cargo.toml b/components/spider-task-executor/Cargo.toml index c51c09b2..789308ca 100644 --- a/components/spider-task-executor/Cargo.toml +++ b/components/spider-task-executor/Cargo.toml @@ -7,11 +7,36 @@ edition = "2024" name = "spider_task_executor" path = "src/lib.rs" +[[bin]] +name = "spider-task-executor" +path = "src/bin/spider_task_executor.rs" + [dependencies] +anyhow = "1.0.98" +bincode = "1.3.3" +bytes = "1.10" +futures-util = { version = "0.3.31", default-features = false, features = [ + "sink", + "std" +] } libloading = "0.8.5" rmp-serde = "1.3.1" +serde = { version = "1.0.228", features = ["derive"] } +spider-core = { path = "../spider-core" } spider-tdl = { path = "../spider-tdl" } thiserror = "2.0.18" - -[dev-dependencies] -anyhow = "1.0.98" +tokio = { version = "1.50.0", features = [ + "io-std", + "io-util", + "macros", + "rt", + "sync", + "time" +] } +tokio-util = { version = "0.7", features = ["codec"] } +tracing = { version = "0.1.41", default-features = false, features = ["std"] } +tracing-subscriber = { version = "0.3.19", default-features = false, features = [ + "env-filter", + "fmt", + "json" +] } diff --git a/components/spider-task-executor/src/bin/spider_task_executor.rs b/components/spider-task-executor/src/bin/spider_task_executor.rs new file mode 100644 index 00000000..5be95bf0 --- /dev/null +++ b/components/spider-task-executor/src/bin/spider_task_executor.rs @@ -0,0 +1,146 @@ +//! Spider task-executor binary. +//! +//! Reads bincode-framed [`Request`](spider_task_executor::protocol::Request)s from `stdin`, +//! dispatches them through a [`TdlPackageManager`], and writes +//! [`Response`](spider_task_executor::protocol::Response)s to `stdout`. The execution manager +//! spawns this process per slot and supervises it. +//! +//! Package resolution: each `Execute` request names a TDL package; the executor looks for +//! `${SPIDER_TDL_PACKAGE_DIR}/${package}/${package}.so` and caches the loaded library by name. +//! +//! Execution model: requests are processed strictly sequentially on a single-threaded tokio +//! runtime. Tokio is used only to match the async I/O surface on the execution manager side; +//! the executor itself has no concurrency requirements, and exactly one task runs for the +//! lifetime of the process. + +use std::{ + path::{Path, PathBuf}, + time::Instant, +}; + +use anyhow::{Result, anyhow}; +use bytes::Bytes; +use futures_util::{SinkExt, StreamExt}; +use spider_task_executor::{ + ExecutorError, + TdlPackageManager, + protocol::{ExecutorOutcome, Request, Response}, +}; +use tokio::io::{stdin, stdout}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; + +/// Env var that points to the directory where compiled TDL packages live. +const SPIDER_TDL_PACKAGE_DIR: &str = "SPIDER_TDL_PACKAGE_DIR"; + +/// Initializes tracing logging. +fn init_tracing() { + // Send tracing output to stderr so it doesn't pollute the framed-stdout protocol channel. + tracing_subscriber::fmt() + .event_format( + tracing_subscriber::fmt::format() + .with_level(true) + .with_target(false) + .with_file(true) + .with_line_number(true) + .json(), + ) + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(false) + .with_writer(std::io::stderr) + .init(); +} + +/// Runs a task from the given TDL context and inputs. +/// +/// # Returns +/// +/// Forwards [`spider_task_executor::TdlPackage::execute_task`]'s return values on success. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * Forwards [`TdlPackageManager::load`]'s return values on failure. +/// * Forwards [`spider_task_executor::TdlPackage::execute_task`]'s return values on failure. +fn run_task( + manager: &mut TdlPackageManager, + pkg_dir: &Path, + package: &str, + task_func: &str, + raw_ctx: &[u8], + raw_inputs: &[u8], +) -> Result, ExecutorError> { + let pkg = if let Some(pkg) = manager.get(package) { + pkg + } else { + let path = pkg_dir.join(package).join(format!("lib{package}.so")); + manager.load(&path)? + }; + pkg.execute_task(task_func, raw_ctx, raw_inputs) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + init_tracing(); + + let pkg_dir: PathBuf = std::env::var_os(SPIDER_TDL_PACKAGE_DIR) + .map(PathBuf::from) + .ok_or_else(|| anyhow!("{SPIDER_TDL_PACKAGE_DIR} env var not set"))?; + + let mut requests = FramedRead::new(stdin(), LengthDelimitedCodec::new()); + let mut responses = FramedWrite::new(stdout(), LengthDelimitedCodec::new()); + + let mut manager = TdlPackageManager::new(); + + tracing::info!("Executor starts."); + + while let Some(frame) = requests.next().await { + let frame = frame + .inspect_err(|e| tracing::error!(err = ? e, "Failed to receive request frame."))?; + let req: Request = bincode::deserialize(&frame) + .inspect_err(|e| tracing::error!(err = ? e, "Failed to deserialize request."))?; + match req { + Request::Execute { + tdl_context, + raw_ctx, + raw_inputs, + } => { + let started = Instant::now(); + let outcome = match run_task( + &mut manager, + &pkg_dir, + &tdl_context.package, + &tdl_context.task_func, + &raw_ctx, + &raw_inputs, + ) { + Ok(outputs) => ExecutorOutcome::Success { outputs }, + Err(e) => ExecutorOutcome::Failure { + error: rmp_serde::to_vec(&e).inspect_err( + |e| tracing::error!(err = ? e, "Failed to serialize execution result."), + )?, + }, + }; + let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX); + + let resp = Response::Result { + outcome, + elapsed_us, + }; + let bytes = bincode::serialize(&resp) + .inspect_err(|e| tracing::error!(err = ? e, "Failed to serialize response."))?; + responses + .send(Bytes::from(bytes)) + .await + .inspect_err(|e| tracing::error!(err = ? e, "Failed to send response."))?; + } + Request::Shutdown => { + tracing::info!("Received shutdown request."); + break; + } + } + } + + tracing::info!("Executor exits."); + Ok(()) +} diff --git a/components/spider-task-executor/src/error.rs b/components/spider-task-executor/src/error.rs index da582342..c8da04ef 100644 --- a/components/spider-task-executor/src/error.rs +++ b/components/spider-task-executor/src/error.rs @@ -6,11 +6,11 @@ use spider_tdl::{TdlError, Version}; /// /// [`TdlError`] (failure inside a user task) is wrapped via [`Self::TaskError`] so callers can /// distinguish executor-internal failures from in-task failures. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] pub enum ExecutorError { /// `dlopen` failed or a required FFI symbol was missing. #[error("failed to load TDL package library: {0}")] - InvalidLibrary(#[from] libloading::Error), + InvalidLibrary(String), /// The package's declared `spider-tdl` ABI version is not compatible with the executor's. #[error( @@ -33,7 +33,7 @@ pub enum ExecutorError { /// The byte buffer contains invalid UTF-8 patterns. #[error("invalid UTF-8: {0}")] - InvalidUtf8(#[from] std::str::Utf8Error), + InvalidUtf8(String), /// A user task returned a [`TdlError`] across the FFI boundary. #[error("task execution failed: {0}")] @@ -42,7 +42,25 @@ pub enum ExecutorError { /// The msgpack-encoded error payload returned by a failing task could not be decoded back into /// a [`TdlError`]. #[error("failed to deserialize error payload: {0}")] - ErrorPayloadDeserializationFailure(#[from] rmp_serde::decode::Error), + ErrorPayloadDeserializationFailure(String), +} + +impl From for ExecutorError { + fn from(value: libloading::Error) -> Self { + Self::InvalidLibrary(value.to_string()) + } +} + +impl From for ExecutorError { + fn from(value: std::str::Utf8Error) -> Self { + Self::InvalidUtf8(value.to_string()) + } +} + +impl From for ExecutorError { + fn from(value: rmp_serde::decode::Error) -> Self { + Self::ErrorPayloadDeserializationFailure(value.to_string()) + } } impl ExecutorError { diff --git a/components/spider-task-executor/src/lib.rs b/components/spider-task-executor/src/lib.rs index b5b05076..3afb0484 100644 --- a/components/spider-task-executor/src/lib.rs +++ b/components/spider-task-executor/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub mod manager; +pub mod protocol; pub use error::ExecutorError; pub use manager::{TdlPackage, TdlPackageManager}; diff --git a/components/spider-task-executor/src/manager.rs b/components/spider-task-executor/src/manager.rs index 49fca52b..61060055 100644 --- a/components/spider-task-executor/src/manager.rs +++ b/components/spider-task-executor/src/manager.rs @@ -21,6 +21,7 @@ use crate::error::ExecutorError; /// avoid repeating the FFI round trip on every call. The execute fn pointer is also resolved once /// at load time and cached so each [`Self::execute_task`] call doesn't require `dlsym` per /// dispatch. +#[derive(Debug)] pub struct TdlPackage { /// The name of the package. name: String, @@ -190,7 +191,7 @@ impl TdlPackageManager { /// /// # Returns /// - /// The newly loaded package's name on success. + /// The newly loaded package on success. /// /// # Errors /// @@ -199,14 +200,14 @@ impl TdlPackageManager { /// * [`ExecutorError::DuplicatePackage`] if a package with the same name is already loaded. The /// freshly loaded library will be dropped (unloaded). /// * Forwards [`TdlPackage::load`]'s return values on failure. - pub fn load(&mut self, path: &Path) -> Result { + pub fn load(&mut self, path: &Path) -> Result<&TdlPackage, ExecutorError> { let package = TdlPackage::load(path)?; if self.packages.contains_key(package.name()) { return Err(ExecutorError::DuplicatePackage(package.name().to_owned())); } let name_key = package.name().to_owned(); let inserted = self.packages.entry(name_key).or_insert(package); - Ok(inserted.name().to_owned()) + Ok(inserted) } /// # Returns diff --git a/components/spider-task-executor/src/protocol.rs b/components/spider-task-executor/src/protocol.rs new file mode 100644 index 00000000..935d60d7 --- /dev/null +++ b/components/spider-task-executor/src/protocol.rs @@ -0,0 +1,49 @@ +//! Wire protocol between the execution manager and a `spider-task-executor` subprocess. +//! +//! The parent encodes each [`Request`] with `bincode` and writes it as one length-delimited frame +//! over the executor's `stdin`; the executor reads frames, dispatches to the TDL package manager, +//! and writes one [`Response`] frame back over `stdout`. +//! +//! `stderr` is **not** carried over this protocol. The executor writes diagnostics to its own +//! stderr; how those bytes are disposed of (inherited, piped, redirected to a log file) is a choice +//! made by whoever spawned the process. + +use serde::{Deserialize, Serialize}; +use spider_core::task::TdlContext; + +/// Request from the parent process (execution manager) to the executor. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Request { + Execute { + /// TDL information for identifying which task to execute. + tdl_context: TdlContext, + + /// Serialized task context. + raw_ctx: Vec, + + /// Serialized task inputs. + raw_inputs: Vec, + }, + + Shutdown, +} + +/// Reply from the executor to the parent process. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Response { + Result { + outcome: ExecutorOutcome, + /// Wall-clock duration of the FFI call, measured by the executor. + elapsed_us: u64, + }, +} + +/// Outcome of a task execution. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ExecutorOutcome { + /// Task outputs serialized in wire-format. + Success { outputs: Vec }, + + /// [`crate::ExecutorError`] serialized in msgpack. + Failure { error: Vec }, +} diff --git a/taskfiles/test.yaml b/taskfiles/test.yaml index 83807015..7d79bfdb 100644 --- a/taskfiles/test.yaml +++ b/taskfiles/test.yaml @@ -209,12 +209,20 @@ tasks: # @param {string} SPIDER_STORAGE_URL An URL pointing to the MariaDB instance. spider-huntsman-unit-tests-executor: internal: true + vars: + # TDL packages are staged under `${G_TDL_PACKAGES_DIR}//lib.so` + # so that the `spider-task-executor` binary can resolve them via the on-disk layout it + # documents. + G_TDL_PACKAGES_DIR: "{{.G_BUILD_DIR}}/tdl_packages" + G_RUST_RELEASE_DIR: "{{.G_RUST_BUILD_DIR}}/release" env: MARIADB_PORT: "{{.MARIADB_PORT}}" MARIADB_DATABASE: "{{.MARIADB_DATABASE}}" MARIADB_USERNAME: "{{.MARIADB_USERNAME}}" MARIADB_PASSWORD: "{{.MARIADB_PASSWORD}}" - SPIDER_TDL_PACKAGE_COMPLEX: "{{.G_RUST_BUILD_DIR}}/release/libhuntsman_complex.so" + SPIDER_TDL_PACKAGE_COMPLEX: "{{.G_TDL_PACKAGES_DIR}}/complex/libcomplex.so" + SPIDER_TDL_PACKAGE_DIR: "{{.G_TDL_PACKAGES_DIR}}" + SPIDER_TASK_EXECUTOR_BIN: "{{.G_RUST_RELEASE_DIR}}/spider-task-executor" SPIDER_TEST_INSTRUMENT_OUTPUT_DIR: sh: "echo {{.G_BUILD_DIR}}/spider-instrument-$(uuidgen)" requires: @@ -222,11 +230,22 @@ tasks: dir: "{{.ROOT_DIR}}" deps: ["toolchains:rust"] cmds: - - "mkdir ${SPIDER_TEST_INSTRUMENT_OUTPUT_DIR}" + - "mkdir -p ${SPIDER_TEST_INSTRUMENT_OUTPUT_DIR}" - defer: "rm -rf ${SPIDER_TEST_INSTRUMENT_OUTPUT_DIR}" - |- . "{{.G_RUST_TOOLCHAIN_ENV_FILE}}" - cargo build --package huntsman-complex --release + # `--bin` is a workspace-wide target filter; combining it with cdylib packages in the + # same `cargo build` would silently exclude the `.so` artifacts. Use one invocation per + # artifact to keep the target selection unambiguous. + cargo build --release --package huntsman-complex + cargo build --release --package integration-test-tasks + cargo build --release --package spider-task-executor --bin spider-task-executor + mkdir -p "{{.G_TDL_PACKAGES_DIR}}/complex" \ + "{{.G_TDL_PACKAGES_DIR}}/integration_test_tasks" + cp "{{.G_RUST_RELEASE_DIR}}/libhuntsman_complex.so" \ + "{{.G_TDL_PACKAGES_DIR}}/complex/libcomplex.so" + cp "{{.G_RUST_RELEASE_DIR}}/libintegration_test_tasks.so" \ + "{{.G_TDL_PACKAGES_DIR}}/integration_test_tasks/libintegration_test_tasks.so" cargo nextest run --all --all-features --run-ignored all --release - |- for f in ${SPIDER_TEST_INSTRUMENT_OUTPUT_DIR}/*; do diff --git a/tests/huntsman/integration-test-tasks/Cargo.toml b/tests/huntsman/integration-test-tasks/Cargo.toml new file mode 100644 index 00000000..0c77122e --- /dev/null +++ b/tests/huntsman/integration-test-tasks/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "integration-test-tasks" +version = "0.1.0" +edition = "2024" +publish = false + +[lib] +# `cdylib` is what the task-executor dlopens; `rlib` lets other Rust crates (the integration +# tests) `use` constants like `INSTRUMENT_SLEEP_US`. +crate-type = ["cdylib", "rlib"] +name = "integration_test_tasks" +path = "src/lib.rs" + +[dependencies] +serde = { version = "1.0.228", features = ["derive"] } +spider-tdl = { path = "../../../components/spider-tdl", features = ["derive"] } diff --git a/tests/huntsman/integration-test-tasks/src/lib.rs b/tests/huntsman/integration-test-tasks/src/lib.rs new file mode 100644 index 00000000..ca6558a6 --- /dev/null +++ b/tests/huntsman/integration-test-tasks/src/lib.rs @@ -0,0 +1,75 @@ +//! Test TDL package used by the `task-executor` integration tests. +//! +//! Exposes four tasks that exercise distinct executor code paths: +//! +//! * [`task_decl::fibonacci`] — basic compute + correctness. +//! * [`task_decl::always_fail`] — in-task error reporting. +//! * [`task_decl::always_panic`] — process-level crash handling. +//! * [`task_decl::instrument`] — fixed-cost task: sleeps for a known [`INSTRUMENT_SLEEP`] duration +//! then echoes its `Vec` payload back. Used by the overhead bench so the non-sleep +//! portion of the executor's reported FFI time isolates the in-executor input/output serde cost, +//! while the parent-side delta isolates IPC framing cost. + +/// The constant sleep duration used by [`task_decl::instrument`]. +/// +/// Exposed at crate scope so the overhead bench (linked dynamically, so it can't read the value +/// through the cdylib) can reference the same number to keep them in sync if changed. +pub const INSTRUMENT_SLEEP_US: u64 = 50; + +mod task_decl { + use std::{thread::sleep, time::Duration}; + + use spider_tdl::{TaskContext, TdlError, task}; + + use crate::INSTRUMENT_SLEEP_US; + + /// Computes the `index`-th Fibonacci number with a deliberately naive recursive + /// implementation so the call has measurable CPU cost for the overhead benchmark. + #[task(name = "fibonacci")] + pub fn fibonacci(_ctx: TaskContext, index: u64) -> Result { + Ok(fib(index)) + } + + fn fib(index: u64) -> u64 { + if index < 2 { + index + } else { + fib(index - 1) + fib(index - 2) + } + } + + /// Always returns a [`TdlError::ExecutionError`]. + #[task(name = "always_fail")] + pub fn always_fail(_ctx: TaskContext) -> Result { + Err(TdlError::ExecutionError( + "always_fail: intentional failure".to_owned(), + )) + } + + /// Always panics. The panic crosses the `extern "C"` FFI boundary, which aborts the executor + /// process — the test asserts the parent observes that crash. + #[task(name = "always_panic")] + pub fn always_panic(_ctx: TaskContext) -> Result { + panic!("always_panic: intentional panic") + } + + /// Sleeps for a fixed [`INSTRUMENT_SLEEP_US`] microseconds, then echoes the input back. + /// + /// The fixed-cost body lets the overhead bench subtract the known sleep from the executor's + /// reported FFI duration, isolating the in-executor input/output serde overhead. + #[task(name = "instrument")] + pub fn instrument(_ctx: TaskContext, items: Vec) -> Result, TdlError> { + sleep(Duration::from_micros(INSTRUMENT_SLEEP_US)); + Ok(items) + } +} + +spider_tdl::register_tdl_package! { + package_name: "integration_test_tasks", + tasks: [ + task_decl::fibonacci, + task_decl::always_fail, + task_decl::always_panic, + task_decl::instrument, + ], +} diff --git a/tests/huntsman/task-executor/Cargo.toml b/tests/huntsman/task-executor/Cargo.toml new file mode 100644 index 00000000..0d237bef --- /dev/null +++ b/tests/huntsman/task-executor/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "task-executor-tests" +version = "0.1.0" +edition = "2024" +publish = false + +[lib] +name = "task_executor_tests" +path = "src/lib.rs" + +[[test]] +name = "executor" +path = "tests/test_executor.rs" + +[[test]] +name = "overhead_instrument" +path = "tests/overhead_instrument.rs" + +[dependencies] +bincode = "1.3.3" +bytes = "1.10" +futures-util = { version = "0.3.31", default-features = false, features = [ + "sink", + "std" +] } +rmp-serde = "1.3.1" +serde = { version = "1.0.228", features = ["derive"] } +spider-core = { path = "../../../components/spider-core" } +spider-task-executor = { path = "../../../components/spider-task-executor" } +spider-tdl = { path = "../../../components/spider-tdl" } +tokio = { version = "1.50.0", features = [ + "io-util", + "macros", + "process", + "rt", + "time" +] } +tokio-util = { version = "0.7", features = ["codec"] } + +[dev-dependencies] +integration-test-tasks = { path = "../integration-test-tasks" } +tabled = "0.20.0" diff --git a/tests/huntsman/task-executor/src/lib.rs b/tests/huntsman/task-executor/src/lib.rs new file mode 100644 index 00000000..c42a20f4 --- /dev/null +++ b/tests/huntsman/task-executor/src/lib.rs @@ -0,0 +1,275 @@ +//! Test harness shared by the `task-executor-tests` integration tests. +//! +//! Spawns the `spider-task-executor` binary as a child process, frames bincode requests on its +//! stdin and reads bincode responses from its stdout — the exact wire protocol of +//! [`spider_task_executor::protocol`]. +//! +//! Every fallible operation in this harness panics with `.expect(...)` on failure; the tests are +//! infrastructure, not production code, and the panic message + backtrace is more useful at the +//! failure site than threading an error type through every helper. +//! +//! Environment: +//! +//! * `SPIDER_TASK_EXECUTOR_BIN` — absolute path to the executor binary. +//! * `SPIDER_TDL_PACKAGE_DIR` — directory the binary searches for TDL packages; gets forwarded to +//! the child verbatim. + +use std::{path::PathBuf, process::Stdio}; + +use bytes::Bytes; +use futures_util::{SinkExt, StreamExt}; +use spider_core::{ + task::TdlContext, + types::{ + id::{JobId, ResourceGroupId, TaskId}, + io::TaskInput, + }, +}; +use spider_task_executor::protocol::{Request, Response}; +use spider_tdl::{ + TaskContext, + wire::{TaskInputsSerializer, TaskOutputsSerializer}, +}; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; + +/// The TDL package name registered by `integration-test-tasks`. +pub const PACKAGE_NAME: &str = "integration_test_tasks"; + +/// One running executor subprocess plus framed handles to its stdin / stdout. +/// +/// The subprocess will be killed when the handle is dropped. +pub struct ExecutorHandle { + child: Child, + requests: FramedWrite, + responses: FramedRead, +} + +impl ExecutorHandle { + /// Spawns the executor binary with `SPIDER_TDL_PACKAGE_DIR` set; the child inherits the + /// parent's stderr so panic / abort messages surface in the test log. + /// + /// # Returns + /// + /// A handle owning the running subprocess and framed I/O. + /// + /// # Panics + /// + /// Panics if the binary cannot be spawned or its stdio handles cannot be claimed. + #[must_use] + pub fn spawn() -> Self { + let mut child = Command::new(task_executor_bin()) + .env("SPIDER_TDL_PACKAGE_DIR", tdl_package_dir()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .kill_on_drop(true) + .spawn() + .expect("spawn executor binary"); + let stdin = child.stdin.take().expect("stdin must be piped"); + let stdout = child.stdout.take().expect("stdout must be piped"); + Self { + child, + requests: FramedWrite::new(stdin, LengthDelimitedCodec::new()), + responses: FramedRead::new(stdout, LengthDelimitedCodec::new()), + } + } + + /// Bincode-serializes `req` and writes one length-delimited frame to the executor's stdin. + /// + /// # Panics + /// + /// Panics if encoding fails or the stdin pipe cannot be written. + pub async fn send(&mut self, req: &Request) { + let bytes = bincode::serialize(req).expect("bincode encode Request"); + self.requests + .send(Bytes::from(bytes)) + .await + .expect("write request frame"); + } + + /// Reads exactly one length-delimited frame from the executor's stdout and bincode-decodes it. + /// + /// # Returns + /// + /// The next [`Response`] from the executor. + /// + /// # Panics + /// + /// Panics if stdout closes before a frame arrives, the frame I/O fails, or decoding fails. + pub async fn recv(&mut self) -> Response { + let frame = self + .responses + .next() + .await + .expect("executor closed stdout before reply") + .expect("read response frame"); + bincode::deserialize(&frame).expect("bincode decode Response") + } + + /// Reads at most one length-delimited frame, tolerating a clean EOF (which crash-path tests + /// rely on to detect that the executor died). + /// + /// # Returns + /// + /// `Some(response)` if a frame was received, `None` if stdout closed cleanly first. + /// + /// # Panics + /// + /// Panics if the frame I/O fails for a reason other than EOF or if decoding fails. + pub async fn try_recv(&mut self) -> Option { + let frame = self.responses.next().await?; + let bytes = frame.expect("read response frame"); + Some(bincode::deserialize(&bytes).expect("bincode decode Response")) + } + + /// Sends [`Request::Shutdown`], closes stdin, and waits for the child to exit cleanly. + /// + /// # Panics + /// + /// Panics if waiting on the child fails or the child exits non-zero. + pub async fn shutdown_clean(mut self) { + self.send(&Request::Shutdown).await; + // Close the stdin pipe so the child sees EOF after `Shutdown` is drained. + drop(self.requests); + let status = self.child.wait().await.expect("wait for executor"); + assert!(status.success(), "executor exited with status {status:?}"); + } + + /// Closes stdin and waits for the child to exit. Used by crash-path tests that don't expect + /// a clean shutdown. + /// + /// # Returns + /// + /// The child's [`ExitStatus`](std::process::ExitStatus). + /// + /// # Panics + /// + /// Panics if waiting on the child fails. + pub async fn wait_for_exit(mut self) -> std::process::ExitStatus { + drop(self.requests); + self.child.wait().await.expect("wait for executor") + } +} + +/// # Returns +/// +/// The absolute path of the `spider-task-executor` binary, read from `SPIDER_TASK_EXECUTOR_BIN`. +/// +/// # Panics +/// +/// Panics if `SPIDER_TASK_EXECUTOR_BIN` is unset. +#[must_use] +pub fn task_executor_bin() -> PathBuf { + std::env::var_os("SPIDER_TASK_EXECUTOR_BIN") + .map(PathBuf::from) + .expect("SPIDER_TASK_EXECUTOR_BIN env var not set") +} + +/// # Returns +/// +/// The TDL package staging directory, read from `SPIDER_TDL_PACKAGE_DIR`. Forwarded verbatim +/// into the executor child's environment so it resolves +/// `${SPIDER_TDL_PACKAGE_DIR}//lib.so`. +/// +/// # Panics +/// +/// Panics if `SPIDER_TDL_PACKAGE_DIR` is unset. +#[must_use] +pub fn tdl_package_dir() -> PathBuf { + std::env::var_os("SPIDER_TDL_PACKAGE_DIR") + .map(PathBuf::from) + .expect("SPIDER_TDL_PACKAGE_DIR env var not set") +} + +/// # Returns +/// +/// A placeholder msgpack-encoded [`TaskContext`] suitable for a one-shot test invocation. The id +/// fields are fresh per call but the executor doesn't inspect them. +/// +/// # Panics +/// +/// Panics if msgpack encoding fails (the test ids serialize trivially). +#[must_use] +pub fn build_ctx() -> Vec { + let ctx = TaskContext { + job_id: JobId::new(), + task_id: TaskId::new(), + task_instance_id: 1, + resource_group_id: ResourceGroupId::new(), + }; + rmp_serde::to_vec(&ctx).expect("serialize TaskContext") +} + +/// # Type Parameters +/// +/// * `T` - The Serde-serializable value type passed as the task's single input. +/// +/// # Returns +/// +/// A wire-format buffer carrying one [`TaskInput::ValuePayload`] holding the msgpack-encoded +/// `value` — i.e. the same shape the parent ships for a single-argument task. +/// +/// # Panics +/// +/// Panics if msgpack encoding or wire-format append fails. +#[must_use] +pub fn encode_single_input(value: &T) -> Vec { + let mut inputs = TaskInputsSerializer::new(); + inputs + .append(TaskInput::ValuePayload( + rmp_serde::to_vec(value).expect("msgpack encode input"), + )) + .expect("append wire-format input"); + inputs.release() +} + +/// # Returns +/// +/// A wire-format buffer carrying zero inputs — for nullary tasks like `always_fail` and +/// `always_panic`. +#[must_use] +pub fn encode_no_inputs() -> Vec { + TaskInputsSerializer::new().release() +} + +/// # Type Parameters +/// +/// * `T` - The Serde-deserializable type the output payload should decode into. +/// +/// # Returns +/// +/// The single msgpack-encoded value carried in `output_bytes`, deserialized as `T`. +/// +/// # Panics +/// +/// Panics if the outputs buffer doesn't contain exactly one value, or if the msgpack decode +/// fails. +#[must_use] +pub fn decode_single_output(output_bytes: &[u8]) -> T { + let outputs = + TaskOutputsSerializer::deserialize(output_bytes).expect("decode wire-format outputs"); + assert_eq!( + outputs.len(), + 1, + "expected exactly one output payload, got {}", + outputs.len(), + ); + rmp_serde::from_slice(&outputs[0]).expect("msgpack decode output") +} + +/// # Returns +/// +/// A [`Request::Execute`] targeting `task_func` in the integration package, with a fresh test +/// `TaskContext` and the caller-supplied wire-format `raw_inputs`. +#[must_use] +pub fn execute_request(task_func: &str, raw_inputs: Vec) -> Request { + Request::Execute { + tdl_context: TdlContext { + package: PACKAGE_NAME.to_owned(), + task_func: task_func.to_owned(), + }, + raw_ctx: build_ctx(), + raw_inputs, + } +} diff --git a/tests/huntsman/task-executor/tests/overhead_instrument.rs b/tests/huntsman/task-executor/tests/overhead_instrument.rs new file mode 100644 index 00000000..bae3fa98 --- /dev/null +++ b/tests/huntsman/task-executor/tests/overhead_instrument.rs @@ -0,0 +1,228 @@ +//! Measures the round-trip overhead of one task execution through the `spider-task-executor` +//! binary. +//! +//! Drives the `instrument` task — which sleeps for a known constant +//! [`INSTRUMENT_SLEEP_US`](integration_test_tasks::INSTRUMENT_SLEEP_US) and then echoes its +//! `Vec` payload — against a *long-lived* executor subprocess (the FFI library is +//! cached after the first call, so subsequent dispatches measure steady-state overhead, not +//! one-time dlopen cost). With the work portion held constant we can split the cost into: +//! +//! * `e2e`: parent's wall-clock around `send(Execute)` → `recv(Response::Result)`. +//! * `executor`: the in-executor FFI duration, taken straight from +//! [`Response::Result::elapsed_us`]. This is `INSTRUMENT_SLEEP_US` + the executor's in-FFI +//! input/output serde. +//! * `executor_internal`: `executor - INSTRUMENT_SLEEP_US`. Approximates the in-executor +//! input/output serde cost alone. +//! * `ipc_overhead`: `e2e - executor`. The parent-side framing + bincode + pipe traversal. +//! +//! Aggregates (avg, p50, p95, p99) for each metric land in a markdown table at +//! `${SPIDER_TEST_INSTRUMENT_OUTPUT_DIR}/task_executor_overhead.md`. + +use std::{ + fs::File, + io::Write, + path::PathBuf, + time::{Duration, Instant}, +}; + +use integration_test_tasks::INSTRUMENT_SLEEP_US; +use spider_task_executor::protocol::{ExecutorOutcome, Response}; +use tabled::{Table, Tabled}; +use task_executor_tests::{ + ExecutorHandle, + decode_single_output, + encode_single_input, + execute_request, +}; + +const PAYLOAD_LEN: usize = 100; +const ITERATIONS: usize = 10; +const OUTPUT_FILE: &str = "task_executor_overhead.md"; +const INSTRUMENT_OUTPUT_DIR_ENV: &str = "SPIDER_TEST_INSTRUMENT_OUTPUT_DIR"; + +/// One row in the markdown table: a metric and its aggregate latency statistics. +#[derive(Tabled)] +struct LatencyRow { + #[tabled(rename = "Metric")] + metric: &'static str, + #[tabled(rename = "Count")] + count: usize, + #[tabled(rename = "Avg (µs)")] + avg_us: String, + #[tabled(rename = "P50 (µs)")] + p50_us: String, + #[tabled(rename = "P95 (µs)")] + p95_us: String, + #[tabled(rename = "P99 (µs)")] + p99_us: String, +} + +impl LatencyRow { + /// Sorts `samples` in place and computes `count`, `avg`, `p50`, `p95`, `p99` in microseconds. + /// + /// # Returns + /// + /// A populated [`LatencyRow`], or a row with `"N/A"` aggregates when `samples` is empty. + fn from_samples(metric: &'static str, samples: &mut [Duration]) -> Self { + if samples.is_empty() { + return Self { + metric, + count: 0, + avg_us: "N/A".to_owned(), + p50_us: "N/A".to_owned(), + p95_us: "N/A".to_owned(), + p99_us: "N/A".to_owned(), + }; + } + samples.sort(); + let count = samples.len(); + let sum: Duration = samples.iter().sum(); + #[allow(clippy::cast_precision_loss)] + let avg = sum.as_secs_f64() * 1_000_000.0 / count as f64; + let last = count - 1; + let p50 = samples[(count / 2).min(last)].as_secs_f64() * 1_000_000.0; + let p95 = samples[(count * 95 / 100).min(last)].as_secs_f64() * 1_000_000.0; + let p99 = samples[(count * 99 / 100).min(last)].as_secs_f64() * 1_000_000.0; + Self { + metric, + count, + avg_us: format!("{avg:.2}"), + p50_us: format!("{p50:.2}"), + p95_us: format!("{p95:.2}"), + p99_us: format!("{p99:.2}"), + } + } +} + +#[tokio::test] +#[ignore = "requires `integration-test-tasks` cdylib, `spider-task-executor` binary, and \ + SPIDER_TEST_INSTRUMENT_OUTPUT_DIR"] +async fn instrument_overhead() { + let output_dir = std::env::var_os(INSTRUMENT_OUTPUT_DIR_ENV).map_or_else( + || panic!("{INSTRUMENT_OUTPUT_DIR_ENV} env var not set"), + PathBuf::from, + ); + + let mut handle = ExecutorHandle::spawn(); + + let payload = path_like_payload(PAYLOAD_LEN); + let raw_inputs = encode_single_input(&payload); + let sleep_floor = Duration::from_micros(INSTRUMENT_SLEEP_US); + + // Warm-up: first call dlopens the package. Assert correctness; discard timing. + handle + .send(&execute_request("instrument", raw_inputs.clone())) + .await; + expect_echo(&handle.recv().await, &payload); + + let mut e2e_samples = Vec::with_capacity(ITERATIONS); + let mut executor_samples = Vec::with_capacity(ITERATIONS); + let mut executor_internal_samples = Vec::with_capacity(ITERATIONS); + let mut ipc_overhead_samples = Vec::with_capacity(ITERATIONS); + + for _ in 0..ITERATIONS { + let started = Instant::now(); + handle + .send(&execute_request("instrument", raw_inputs.clone())) + .await; + let response = handle.recv().await; + let e2e = started.elapsed(); + + let Response::Result { + outcome, + elapsed_us, + } = response; + let ExecutorOutcome::Success { outputs } = outcome else { + panic!("instrument task unexpectedly failed in overhead loop"); + }; + let got: Vec = decode_single_output(&outputs); + assert_eq!(got, payload); + + let executor = Duration::from_micros(elapsed_us); + // Defensive: a coarse system clock could in principle report e2e < executor, or executor < + // sleep_floor (the sleep can return slightly early on some platforms). Treat both as zero + // overhead and keep the sample for visibility. + let executor_internal = executor.checked_sub(sleep_floor).unwrap_or(Duration::ZERO); + let ipc_overhead = e2e.checked_sub(executor).unwrap_or(Duration::ZERO); + + e2e_samples.push(e2e); + executor_samples.push(executor); + executor_internal_samples.push(executor_internal); + ipc_overhead_samples.push(ipc_overhead); + } + + handle.shutdown_clean().await; + + let rows = vec![ + LatencyRow::from_samples("E2E (parent)", &mut e2e_samples.clone()), + LatencyRow::from_samples("Executor FFI", &mut executor_samples.clone()), + LatencyRow::from_samples( + "Executor internal (FFI - sleep)", + &mut executor_internal_samples.clone(), + ), + LatencyRow::from_samples( + "IPC overhead (E2E - FFI)", + &mut ipc_overhead_samples.clone(), + ), + ]; + let table = Table::new(rows).to_string(); + + let preamble = format!( + "# Task-executor overhead\n\nInputs: `instrument` task with {PAYLOAD_LEN} path-like \ + strings echoed after a {INSTRUMENT_SLEEP_US}µs sleep, {ITERATIONS} samples (excluding \ + warm-up).\n\n* `Executor internal` ≈ in-executor input/output serde cost.\n* `IPC \ + overhead` ≈ parent-side framing + bincode + pipe traversal.\n\n" + ); + + let path = output_dir.join(OUTPUT_FILE); + let mut file = + File::create(&path).unwrap_or_else(|err| panic!("create {} failed: {err}", path.display())); + file.write_all(preamble.as_bytes()).expect("write preamble"); + file.write_all(table.as_bytes()).expect("write table"); + file.write_all(b"\n").expect("write trailing newline"); +} + +/// Builds `len` deterministic path-like strings. Mixing prefixes and suffixes keeps the payload +/// representative of a realistic input without depending on `rand`. +/// +/// # Returns +/// +/// A `Vec` of length `len`. +fn path_like_payload(len: usize) -> Vec { + const PREFIXES: &[&str] = &[ + "/var/log", + "/usr/local/bin", + "/etc/spider", + "/home/user/projects", + "/opt/data/cache", + ]; + const SUFFIXES: &[&str] = &["log", "txt", "bin", "json", "tmp"]; + (0..len) + .map(|idx| { + let prefix = PREFIXES[idx % PREFIXES.len()]; + let suffix = SUFFIXES[(idx / PREFIXES.len()) % SUFFIXES.len()]; + format!("{prefix}/file_{:04}_{idx:05}.{suffix}", (idx * 31) % 10_000) + }) + .collect() +} + +/// Asserts that `response` is a `Success` whose decoded payload equals `expected`. +/// +/// # Panics +/// +/// Panics if the response is a `Failure` (the decoded +/// [`ExecutorError`](spider_task_executor::ExecutorError) is included in the panic message), or if +/// the decoded payload doesn't match `expected`. +fn expect_echo(response: &Response, expected: &[String]) { + let Response::Result { outcome, .. } = response; + let outputs = match outcome { + ExecutorOutcome::Success { outputs } => outputs, + ExecutorOutcome::Failure { error } => { + let err: spider_task_executor::ExecutorError = + rmp_serde::from_slice(error).expect("decode ExecutorError payload"); + panic!("instrument warm-up returned Failure: {err:?}"); + } + }; + let got: Vec = decode_single_output(outputs); + assert_eq!(got, expected, "warm-up output mismatch"); +} diff --git a/tests/huntsman/task-executor/tests/test_executor.rs b/tests/huntsman/task-executor/tests/test_executor.rs new file mode 100644 index 00000000..e2eb8ec4 --- /dev/null +++ b/tests/huntsman/task-executor/tests/test_executor.rs @@ -0,0 +1,90 @@ +//! End-to-end correctness tests against the `spider-task-executor` binary. +//! +//! Each test spawns a fresh executor subprocess via [`ExecutorHandle::spawn`], exchanges one framed +//! bincode request/response over the binary's stdin/stdout, and asserts on the result. + +use spider_task_executor::{ + ExecutorError, + protocol::{ExecutorOutcome, Response}, +}; +use spider_tdl::TdlError; +use task_executor_tests::{ + ExecutorHandle, + decode_single_output, + encode_no_inputs, + encode_single_input, + execute_request, +}; + +#[tokio::test] +#[ignore = "requires `integration-test-tasks` cdylib and `spider-task-executor` binary"] +async fn fibonacci_returns_correct_value() { + let mut handle = ExecutorHandle::spawn(); + let input: u64 = 10; + handle + .send(&execute_request("fibonacci", encode_single_input(&input))) + .await; + let Response::Result { outcome, .. } = handle.recv().await; + match outcome { + ExecutorOutcome::Success { outputs } => { + let got: u64 = decode_single_output(&outputs); + // Fib(10) = 55 + assert_eq!(got, 55); + } + ExecutorOutcome::Failure { error } => { + let err: ExecutorError = + rmp_serde::from_slice(&error).expect("decode ExecutorError payload"); + panic!("expected Success for fibonacci(10), got Failure: {err:?}"); + } + } + handle.shutdown_clean().await; +} + +#[tokio::test] +#[ignore = "requires `integration-test-tasks` cdylib and `spider-task-executor` binary"] +async fn always_fail_reports_task_error() { + let mut handle = ExecutorHandle::spawn(); + handle + .send(&execute_request("always_fail", encode_no_inputs())) + .await; + let Response::Result { outcome, .. } = handle.recv().await; + match outcome { + ExecutorOutcome::Success { outputs } => { + panic!("expected Failure, got Success with {} bytes", outputs.len()); + } + ExecutorOutcome::Failure { error } => { + let err: ExecutorError = + rmp_serde::from_slice(&error).expect("decode ExecutorError payload"); + let ExecutorError::TaskError(TdlError::ExecutionError(message)) = &err else { + panic!("expected TaskError(ExecutionError), got {err:?}"); + }; + assert!( + message.contains("always_fail"), + "unexpected error message: {message}", + ); + } + } + handle.shutdown_clean().await; +} + +#[tokio::test] +#[ignore = "requires `integration-test-tasks` cdylib and `spider-task-executor` binary"] +async fn always_panic_crashes_the_process() { + let mut handle = ExecutorHandle::spawn(); + handle + .send(&execute_request("always_panic", encode_no_inputs())) + .await; + + // A panic across the `extern "C"` boundary aborts the executor process. The parent must + // observe stdout EOF (no further frames) and a non-zero exit status. + let frame = handle.try_recv().await; + assert!( + frame.is_none(), + "expected stdout EOF after panic, got a response frame: {frame:?}", + ); + let status = handle.wait_for_exit().await; + assert!( + !status.success(), + "expected non-zero exit after panic, got {status:?}", + ); +} diff --git a/tests/huntsman/tdl-integration/tests/complex.rs b/tests/huntsman/tdl-integration/tests/complex.rs index 007cb557..513e7d75 100644 --- a/tests/huntsman/tdl-integration/tests/complex.rs +++ b/tests/huntsman/tdl-integration/tests/complex.rs @@ -88,8 +88,8 @@ fn decode_complex_vec(output_bytes: &[u8]) -> anyhow::Result { fn load_and_query_name() -> anyhow::Result<()> { let path = lib_path(); let mut manager = TdlPackageManager::new(); - let name = manager.load(&path)?; - assert_eq!(name, PACKAGE_NAME); + let pkg = manager.load(&path)?; + assert_eq!(pkg.name(), PACKAGE_NAME); let pkg = manager .get(PACKAGE_NAME) .expect("just-loaded package should be retrievable");