Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/fs/stub.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <dirent.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <moonbit.h>
Expand Down Expand Up @@ -88,6 +89,22 @@ int moonbitlang_async_unlock_file(int fd) {
return flock(fd, LOCK_UN);
}

moonbit_string_t moonbitlang_async_get_tmp_base_path() {
const char *path;
#ifdef __ANDROID__
const char *tmpdir = getenv("TMPDIR");
path = tmpdir ? tmpdir : "/data/local/tmp/";
Comment on lines +95 to +96
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ensure TMPDIR has trailing slash before using as base path

This uses $TMPDIR verbatim on Android, but tmpdir() builds paths by directly concatenating tmp_base_path with the generated name, so a common value like /data/local/tmp (no trailing slash) produces malformed paths such as /data/local/tmpprefix... and can create directories in the wrong location or fail unexpectedly. Normalize the base path (or append / if missing) before returning it.

Useful? React with 👍 / 👎.

#else
path = "/tmp/";
#endif
size_t len = strlen(path);
moonbit_string_t str = moonbit_make_string_raw(len);
for (size_t i = 0; i < len; i++) {
((uint16_t*)str)[i] = (uint16_t)(unsigned char)path[i];
}
return str;
}

#endif

#ifdef _WIN32
Expand Down
6 changes: 5 additions & 1 deletion src/fs/tmpdir.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ let tmpdir_seed : @random.Rand = {

///|
#cfg(not(platform="windows"))
let tmp_base_path : String = "/tmp/"
extern "C" fn get_tmp_base_path_ffi() -> String = "moonbitlang_async_get_tmp_base_path"

///|
#cfg(not(platform="windows"))
let tmp_base_path : String = get_tmp_base_path_ffi()

///|
#cfg(platform="windows")
Expand Down
2 changes: 1 addition & 1 deletion src/internal/event_loop/pkg.generated.mbti
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub let stdout : IoHandle

pub async fn symlink(StringView, StringView, context~ : String) -> Unit

pub async fn wait_pid(Int, context~ : String) -> Unit
pub async fn wait_pid(Int, context~ : String) -> Int?

pub fn with_event_loop(async () -> Unit) -> Unit raise

Expand Down
38 changes: 27 additions & 11 deletions src/internal/event_loop/process_unix.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,34 @@ pub async fn spawn(

///|
#cfg(not(platform="windows"))
pub async fn wait_pid(pid : Int, context~ : String) -> Unit {
pub async fn wait_pid(pid : Int, context~ : String) -> Int? {
guard curr_loop.val is Some(evloop)
ignore(context)
let alt_id = match evloop.poll.register_pid(pid) {
Running(id) => id
Stopped => return
// Try pidfd first (Linux 5.3+), fall back to worker-thread waitpid
let register_result : RegisterPidResult? = Some(evloop.poll.register_pid(pid)) catch {
_ => None
}
guard evloop.pids.get(alt_id) is None
evloop.pids[alt_id] = @coroutine.current_coroutine()
defer {
evloop.pids.remove(alt_id)
evloop.poll.remove_pid(alt_id)
match register_result {
Some(Running(alt_id)) => {
guard evloop.pids.get(alt_id) is None
evloop.pids[alt_id] = @coroutine.current_coroutine()
defer {
evloop.pids.remove(alt_id)
evloop.poll.remove_pid(alt_id)
}
@coroutine.suspend()
None
}
Some(Stopped) => None
None => {
// pidfd not available, fall back to blocking waitpid in worker thread.
// The worker reaps the child and stores the exit code in job.ret(),
// so return it here to avoid a second waitpid in get_process_result.
let job = Job::wait_for_process(pid)
let exit_code = perform_job_in_worker(job, context~, cancel=_ => {
job.cancel_process_waiter()
NeedWait
Comment on lines +58 to +60
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Cancel fallback wait worker when task is cancelled

When register_pid fails (for example on kernels without pidfd_open), this fallback calls perform_job_in_worker with a cancel callback that only sets job.cancel_process_waiter() and returns NeedWait. In EventLoop::wait_for_job, NeedWait suppresses the cancellation until the worker completes, and because this callback never invokes evloop.cancel_job_in_worker(job_id, ...), the blocking waitpid is not interrupted, so @process.run/@process.spawn cancellation handlers do not run until the child exits naturally.

Useful? React with 👍 / 👎.

})
Some(exit_code)
}
}
@coroutine.suspend()
}
3 changes: 2 additions & 1 deletion src/internal/event_loop/process_windows.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ pub async fn spawn(

///|
#cfg(platform="windows")
pub async fn wait_pid(pid : Int, context~ : String) -> Unit {
pub async fn wait_pid(pid : Int, context~ : String) -> Int? {
let job = Job::wait_for_process(pid)
perform_job_in_worker(job, context~, cancel=_ => {
job.cancel_process_waiter()
NeedWait
})
|> ignore
None
}
83 changes: 83 additions & 0 deletions src/internal/event_loop/thread_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
#include <errno.h>
#include <time.h>
#include <dirent.h>
#if !defined(__ANDROID__) || __ANDROID_API__ >= 28
#include <spawn.h>
#endif
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/wait.h>
#include <dlfcn.h>

typedef int HANDLE;

Expand Down Expand Up @@ -366,7 +370,11 @@ struct worker *moonbitlang_async_spawn_worker(
#else
pthread_attr_t attr;
pthread_attr_init(&attr);
#ifdef __ANDROID__
pthread_attr_setstacksize(&attr, 64 * 1024);
#else
pthread_attr_setstacksize(&attr, 512);
#endif

pthread_create(&(worker->id), &attr, &worker_loop, worker);
pthread_attr_destroy(&attr);
Expand Down Expand Up @@ -1881,6 +1889,19 @@ void free_spawn_job(void *obj) {
moonbit_decref(job->cwd);
}

#if defined(__ANDROID__) && __ANDROID_API__ < 28

// posix_spawn is unavailable on Android API < 28.
// Return a job pre-filled with ENOSYS so the caller gets a proper error
// instead of hanging forever (a NULL job causes the worker thread to exit
// without sending a completion notification).
static
void spawn_job_worker(struct job *job) {
job->err = ENOSYS;
}

#else // posix_spawn available

static
void spawn_job_worker(struct job *job) {
struct spawn_job *spawn_job = (struct spawn_job *)job;
Expand Down Expand Up @@ -1913,7 +1934,21 @@ void spawn_job_worker(struct job *job) {
}
}
if (spawn_job->cwd) {
#ifdef __ANDROID__
// addchdir_np only available on Android API 34+; check at runtime
static int (*addchdir_fn)(posix_spawn_file_actions_t *, const char *) = NULL;
static int checked = 0;
if (!checked) {
addchdir_fn = dlsym(RTLD_DEFAULT, "posix_spawn_file_actions_addchdir_np");
checked = 1;
}
if (addchdir_fn)
job->err = addchdir_fn(&file_actions, spawn_job->cwd);
else
job->err = ENOSYS;
#else
job->err = posix_spawn_file_actions_addchdir_np(&file_actions, spawn_job->cwd);
#endif
if (job->err) goto exit;
}

Expand Down Expand Up @@ -1941,6 +1976,8 @@ void spawn_job_worker(struct job *job) {
posix_spawn_file_actions_destroy(&file_actions);
}

#endif // posix_spawn availability

struct spawn_job *moonbitlang_async_make_spawn_job(
char *path,
char **args,
Expand All @@ -1961,6 +1998,52 @@ struct spawn_job *moonbitlang_async_make_spawn_job(
return job;
}

// Unix wait_for_process: blocking waitpid in worker thread
// Used as fallback when pidfd_open is not available (e.g. Android, older Linux)

struct wait_for_process_job {
struct job job;
pid_t pid;
volatile int cancelled;
};

static
void free_wait_for_process_job(void *obj) {}

static
void wait_for_process_job_worker(struct job *job) {
struct wait_for_process_job *wj = (struct wait_for_process_job*)job;
int status;
while (1) {
int ret = waitpid(wj->pid, &status, 0);
Comment thread
tonyfettes marked this conversation as resolved.
if (ret == wj->pid) {
job->ret = WEXITSTATUS(status);
break;
}
if (ret < 0 && errno != EINTR) {
job->err = errno;
return;
}
if (wj->cancelled) {
job->err = ECANCELED;
return;
}
}
}

struct wait_for_process_job *moonbitlang_async_make_wait_for_process_job(
int pid
) {
struct wait_for_process_job *job = MAKE_JOB(wait_for_process);
job->pid = pid;
job->cancelled = 0;
return job;
}

void moonbitlang_async_cancel_wait_for_process_job(struct wait_for_process_job *job) {
job->cancelled = 1;
}

#endif

// ===== getaddrinfo job, resolve host name via `getaddrinfo` =====
Expand Down
2 changes: 0 additions & 2 deletions src/internal/event_loop/thread_pool.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,9 @@ extern "C" fn Job::spawn(
) -> Job = "moonbitlang_async_make_spawn_job"

///|
#cfg(platform="windows")
extern "C" fn Job::wait_for_process(pid : Int) -> Job = "moonbitlang_async_make_wait_for_process_job"

///|
#cfg(platform="windows")
#borrow(job)
extern "C" fn Job::cancel_process_waiter(job : Job) = "moonbitlang_async_cancel_wait_for_process_job"

Expand Down
50 changes: 32 additions & 18 deletions src/process/process.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ pub async fn spawn_orphan(
/// and return the exit code of the process.
pub async fn wait_pid(pid : Int) -> Int {
let context = "@process.wait_pid()"
@event_loop.wait_pid(pid, context~)
let out = Ref::new(0)
let ret = get_process_result(pid, out)
if ret < 0 {
@os_error.check_errno(context)
match @event_loop.wait_pid(pid, context~) {
Some(exit_code) => exit_code
None => {
let out = Ref::new(0)
let ret = get_process_result(pid, out)
if ret < 0 {
@os_error.check_errno(context)
}
out.val
}
}
out.val
}

///|
Expand Down Expand Up @@ -139,7 +143,7 @@ pub async fn run(
is_orphan=false,
context~,
)
@event_loop.wait_pid(pid, context~) catch {
let maybe_exit = @event_loop.wait_pid(pid, context~) catch {
_ if @coroutine.is_being_cancelled() =>
@async.protect_from_cancel(() => {
@async.with_task_group(group => {
Expand All @@ -149,12 +153,17 @@ pub async fn run(
})
err => raise err
}
let out = Ref::new(0)
let ret = get_process_result(pid, out)
if ret < 0 {
@os_error.check_errno(context)
match maybe_exit {
Some(exit_code) => exit_code
None => {
let out = Ref::new(0)
let ret = get_process_result(pid, out)
if ret < 0 {
@os_error.check_errno(context)
}
out.val
}
}
out.val
}

///|
Expand Down Expand Up @@ -227,7 +236,7 @@ pub async fn[X] spawn(
context~,
)
let result = group.spawn(no_wait?, () => {
@event_loop.wait_pid(pid, context~) catch {
let maybe_exit = @event_loop.wait_pid(pid, context~) catch {
_ if @coroutine.is_being_cancelled() =>
@async.protect_from_cancel(() => {
@async.with_task_group(group => {
Expand All @@ -237,12 +246,17 @@ pub async fn[X] spawn(
})
err => raise err
}
let out = Ref::new(0)
let ret = get_process_result(pid, out)
if ret < 0 {
@os_error.check_errno(context)
match maybe_exit {
Some(exit_code) => exit_code
None => {
let out = Ref::new(0)
let ret = get_process_result(pid, out)
if ret < 0 {
@os_error.check_errno(context)
}
out.val
}
}
out.val
})
{ pid, result }
}
Expand Down
Loading