Skip to content

Feature/edge#47

Open
andre-merzky wants to merge 46 commits into
mainfrom
feature/edge
Open

Feature/edge#47
andre-merzky wants to merge 46 commits into
mainfrom
feature/edge

Conversation

@andre-merzky

Copy link
Copy Markdown
Member

No description provided.

andre-merzky and others added 30 commits November 10, 2025 16:41
…cores_per_node

- Rename init_from_scratch() to _initialize() in PBSPro, Torque, Cobalt,
  LSF so the base class actually calls the implementations
- Replace self._log.debug_1/debug_2 (PBSPro) and self._log.debug (LSF)
  with module-level logger.debug
- Fix _get_cores_per_node: generator has no len(), use set→list instead
- Remove stray print("initialize") from _init_info
- Add logger.warning with exc_info in _parse_nodefile bare except

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add compactify_hostlist() and expand_hostlist() static methods to
  ResourceManager for converting between hostname lists and bracket
  notation (e.g., ['host001', 'host002'] <-> ['host00[1,2]'])
- Add _build_brackets() and _minimal_prefix() helpers for compactify
- Refactor get_hostlist() to use new _split_hoststring() + expand_hostlist()
- Refactor get_hostlist_by_range() to format input and delegate to
  expand_hostlist(), removing duplicated expansion logic
- Clean up slurm.py: remove unused imports, fix return type, fix
  _get_node_list() call, remove duplicate helper functions
- Add comprehensive tests for all hostlist methods (37 total RM tests)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add get_partition_env() and release_partition_env() methods to base class
- Add nodefile helper methods: _get_nodefile_path, _write_nodefile, _remove_nodefile
- Implement partition env for Slurm (env vars: SLURM_NODELIST, SLURM_NNODES, etc.)
- Implement partition env for PBSPro/Torque (PBS_NODEFILE, PBS_NUM_NODES)
- Implement partition env for Cobalt (COBALT_NODEFILE, COBALT_PARTSIZE)
- Implement partition env for LSF (LSB_DJOB_HOSTFILE)
- Implement no-op partition env for Fork
- Add comprehensive unit tests for all partition env methods

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Execution backends:
- RadicalPilot: extract partition from resources, set pd.nodes and pd.prepare_env
- Concurrent, Dask, Dragon V1/V2/V3: raise ValueError if partition specified

Resource manager fixes:
- Node.__post_init__: add early return when no rm_info provided
- _filter_nodes: remove broken per-node core/gpu marking code
- get_instance: separate "unknown RM" from "creation failed" errors

Tests:
- Add Node dataclass validation tests
- Add _parse_nodefile and _get_cores_per_node tests
- Add get_partition and get_instance error tests

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove obvious comments that duplicate what the code shows
- Fix _parse_nodefile docstring to match actual behavior (returns list of
  node names, not tuples)
- Clarify _get_cores_per_node expects tuples from RM-specific parsing

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Implement _parse_nodefile_and_cpn in base.py and use it in RMs
- Refactor duplicate partition environment logic into _get_partition_env_with_nodefile
- Improve factory method error handling to report all failures
- Modernize type hints and add docstrings
- Add comprehensive RM initialization tests (test_rm_initialization.py)
- Fix bugs: check_nodes shadowing, tuple handling in get_node_list
- Update Cobalt to auto-detect cores from localhost
- Update PBSPro to auto-derive cores from nodefile if config missing
Resolved merge conflicts in:
- src/rhapsody/__init__.py
- src/rhapsody/backends/constants.py
- src/rhapsody/backends/execution/concurrent.py
- src/rhapsody/backends/execution/dask_parallel.py
- src/rhapsody/backends/execution/dragon.py
- src/rhapsody/backends/execution/radical_pilot.py

All tests passing (207 passed, 23 skipped)
- BaseTask.from_dict(): use data.get() instead of key-in-data check so
  None-valued prompt/executable/function fields don't misroute task class
- Dragon backend: include traceback in aggregated function results
  (single-rank and multi-rank) for better client-side error diagnostics
- Edge backend: propagate traceback field from task notifications

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Completes every submitted task immediately as DONE without executing
anything.  Useful for measuring Edge/bridge/client overhead in isolation
from actual task execution costs.

Usage: rhapsody.get_backend('noop')

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces two new execution backends: EdgeExecutionBackend, which enables remote task execution via RADICAL Edge with built-in submission batching and SSE notification handling, and NoopExecutionBackend for performance benchmarking. Additionally, the PR updates task routing logic to use .get() for safer field access, adds traceback aggregation to the Dragon backend, and adjusts logging levels. The review feedback focuses on improving the responsiveness of the Edge backend by wrapping blocking network calls in asyncio.to_thread, ensuring robust task routing by using explicit 'is not None' checks, and maintaining interface consistency in the Noop backend by aligning method signatures with the base class.

Comment thread src/rhapsody/backends/execution/edge.py Outdated
Comment on lines +137 to +146
self._bc = BridgeClient(url=self._bridge_url)
ec = self._bc.get_edge_client(self._edge_name)

session_kwargs = {'backends': self._remote_backends}
if self._notify_batch_window is not None:
session_kwargs['notify_batch_window'] = self._notify_batch_window
if self._notify_batch_size is not None:
session_kwargs['notify_batch_size'] = self._notify_batch_size

self._rh = ec.get_plugin(self._plugin_name, **session_kwargs)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The BridgeClient initialization and subsequent calls to get_edge_client and get_plugin are likely blocking network operations. Performing these directly in an async method will block the event loop. These calls should be wrapped in asyncio.to_thread to maintain responsiveness.

Suggested change
self._bc = BridgeClient(url=self._bridge_url)
ec = self._bc.get_edge_client(self._edge_name)
session_kwargs = {'backends': self._remote_backends}
if self._notify_batch_window is not None:
session_kwargs['notify_batch_window'] = self._notify_batch_window
if self._notify_batch_size is not None:
session_kwargs['notify_batch_size'] = self._notify_batch_size
self._rh = ec.get_plugin(self._plugin_name, **session_kwargs)
self._bc = await asyncio.to_thread(BridgeClient, url=self._bridge_url)
ec = await asyncio.to_thread(self._bc.get_edge_client, self._edge_name)
session_kwargs = {'backends': self._remote_backends}
if self._notify_batch_window is not None:
session_kwargs['notify_batch_window'] = self._notify_batch_window
if self._notify_batch_size is not None:
session_kwargs['notify_batch_size'] = self._notify_batch_size
self._rh = await asyncio.to_thread(ec.get_plugin, self._plugin_name, **session_kwargs)

Comment thread src/rhapsody/api/task.py
Comment on lines +249 to 252
if data.get("prompt"):
return AITask(**data)
elif "executable" in data or "function" in data:
elif data.get("executable") or data.get("function"):
return ComputeTask(**data)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of truthiness checks (if data.get("prompt"):) can lead to incorrect routing if the field contains a falsy value that is otherwise valid (e.g., an empty string). Since the goal is to avoid misrouting when keys are explicitly set to None, using an explicit is not None check is safer and more robust.

Suggested change
if data.get("prompt"):
return AITask(**data)
elif "executable" in data or "function" in data:
elif data.get("executable") or data.get("function"):
return ComputeTask(**data)
if data.get("prompt") is not None:
return AITask(**data)
elif data.get("executable") is not None or data.get("function") is not None:
return ComputeTask(**data)

Comment on lines +48 to +63
async def submit_tasks(self, tasks: list[dict[str, Any]]) -> list[asyncio.Task]:
if self._backend_state != BackendMainStates.RUNNING:
self._backend_state = BackendMainStates.RUNNING

submitted = []
for task in tasks:
task.update({
"return_value": True,
"stdout": "",
"stderr": "",
"exit_code": 0,
})
self.tasks[task["uid"]] = task
future = asyncio.create_task(self._complete(task))
submitted.append(future)
return submitted

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of submit_tasks returns a list of tasks, which deviates from the BaseBackend interface (which specifies a None return type). Additionally, the submitted list is not used by the caller in Session. It's better to align with the base class signature.

Suggested change
async def submit_tasks(self, tasks: list[dict[str, Any]]) -> list[asyncio.Task]:
if self._backend_state != BackendMainStates.RUNNING:
self._backend_state = BackendMainStates.RUNNING
submitted = []
for task in tasks:
task.update({
"return_value": True,
"stdout": "",
"stderr": "",
"exit_code": 0,
})
self.tasks[task["uid"]] = task
future = asyncio.create_task(self._complete(task))
submitted.append(future)
return submitted
async def submit_tasks(self, tasks: list[dict[str, Any]]) -> None:
if self._backend_state != BackendMainStates.RUNNING:
self._backend_state = BackendMainStates.RUNNING
for task in tasks:
task.update({
"return_value": True,
"stdout": "",
"stderr": "",
"exit_code": 0,
})
self.tasks[task["uid"]] = task
asyncio.create_task(self._complete(task))

Comment on lines +94 to +95
def task_state_cb(self):
pass

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The signature of task_state_cb does not match the abstract method defined in BaseBackend. This will cause a TypeError if the method is called with the expected arguments (task and state).

Suggested change
def task_state_cb(self):
pass
def task_state_cb(self, task: dict, state: str) -> None:
pass

for task in tasks:
self._tasks[task['uid']] = task
if prof:
prof.prof('task_submit', uid=task['uid'])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove prof

"exception": None
if all_successful
else "; ".join(str(r.exception) for r in results if not r.success),
"traceback": None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is correct, V1 and V2 will be removed in Q3, and we are no longer maintaining them

andre-merzky and others added 9 commits April 26, 2026 17:00
When a function task carries cloudpickled bytecode (``"function":
"cloudpickle::..."`` or ``"_pickled_fields"``), the receiving edge
must run the same Python (major,minor) as the client — CodeType's
tuple shape changed between 3.10 and 3.11, and similar skews are
likely in future minor releases.  Today the mismatch surfaces
mid-batch as ``code expected at most 16 arguments, got 18`` from
the remote rhapsody plugin.

Move that to a fail-fast client-side check.  Before each
submission of a batch that contains at least one cloudpickled task,
EdgeExecutionBackend queries ``sysinfo.host_role()`` on the target
edge once (cached on the backend instance), reads ``python_version``,
and raises a clear RuntimeError if the major.minor doesn't match
``sys.version_info``.  No env-var bypass — alignment is required for
function tasks; alternatives are ``executable``-typed tasks or
import-path function tasks (``"function": "module:func"``), neither
of which carries pickled bytecode.

Restored the module-level ``BridgeClient`` re-export — the previous
move into __init__ broke ``patch("rhapsody.backends.execution.edge.
BridgeClient", ...)`` in 14 existing tests.  Module-level binding
returns; the ``ImportError`` chained from the original radical.edge
import error is preserved by capturing it in
``_radical_edge_import_error`` at module load.

Five new tests cover: executable-only batch (no sysinfo lookup),
matching-version cloudpickle batch (proceeds), mismatched-version
cloudpickle batch (RuntimeError, no remote call), cache-once
behavior across submits, and the ``_pickled_fields`` discriminator.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
bridge_url and edge_name are now optional on EdgeExecutionBackend.
bridge_url defaults to $RADICAL_BRIDGE_URL (resolved by BridgeClient).
edge_name auto-selects the first connected edge advertising an enabled
rhapsody plugin, skipping the synthetic 'bridge' edge that hosts
bridge-only plugins like iri_connect.  Selection failure raises
RuntimeError from ``await backend``, not from get_backend().

Plugin name is now a class constant (_PLUGIN_NAME = 'rhapsody'), no
longer a constructor parameter.  Edge resolution + handle creation +
python-version probe live together in _get_rhapsody_handle.

Simplifications (behaviour-preserving):
  - inline _get_logger() at its single call site
  - drop redundant _backend_state assignment in _async_init
  - rename _ensure_python_compat -> _check_python_compat; fold
    _task_needs_pickle_compat in as a local function; drop dead
    "!= (0, 0)" clause (the (0, 0) sentinel was removed)
  - fuse three loops over `tasks` in submit_tasks into one pass
  - drop misleading task_batch_flush prof event in non-batching path
  - collapse _force_flush + _timed_flush into _locked_flush
  - read self._bc.url instead of poking the private self._bc._url

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Bare ``self.batch.results_ddict[tuid]`` blocks the monitor thread on
multi-node Dragon allocations.  Membership-check first.

(cherry picked from commit a454799)
andre-merzky and others added 4 commits May 12, 2026 11:25
Brings the edge/noop backends, the radical_pilot partition hook (consumer side
of the rhapsody_rm contract), and the v0.3.0 / telemetry work onto feature/edge.
Keeps the dragon v3 ddict-guard fix; drops the chunk-submit approach (main had
reverted it) -- chunk-submit is preserved on branch feature/dragon-chunk-submit
for isolated testing/decision later.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants