Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
matrix:
os:
# - macos-12
# - macos-latest
- macos-latest
# - windows-latest
- ubuntu-latest
python-version:
Expand Down Expand Up @@ -69,7 +69,10 @@ jobs:
run: |
python -m pip install --upgrade pip wheel
python -m pip install --upgrade --upgrade-strategy=eager tox
sudo apt-get install -y libjpeg-dev

- if: matrix.os == 'ubuntu-latest'
name: Install libjpeg on Ubuntu
run: sudo apt-get install -y libjpeg-dev

- name: Install uv
if: matrix.toxenv == 'oldestdeps'
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ coverage.*
build/
dist/
venvs/
.DS_Store

# Produced by versioningit
src/con_duct/_version.py
Expand Down
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ build-backend = "setuptools.build_meta"

[tool.versioningit.write]
file = "src/con_duct/_version.py"



[tool.pytest.ini_options]
markers = [
"flaky: mark a test as being unreliable"
]
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ classifiers =
Intended Audience :: Science/Research
Intended Audience :: System Administrators
Topic :: System :: Systems Administration
Operating System :: Unix
Operating System :: MacOS

project_urls =
Source Code = https://github.com/con/duct/
Expand Down
10 changes: 10 additions & 0 deletions src/con_duct/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from con_duct.duct_main import (
DUCT_OUTPUT_PREFIX,
EXECUTION_SUMMARY_FORMAT,
SKIPEMPTY_DEFAULT,
Outputs,
RecordTypes,
SessionMode,
Expand Down Expand Up @@ -366,6 +367,15 @@ def _create_run_parser() -> argparse.ArgumentParser:
"'current-session' tracks the current session instead of starting a new one. "
"Useful for tracking slurm jobs or other commands that should run in the current session.",
)
parser.add_argument(
"--skipempty",
action="store_true",
help=(
"Skip sample aggregation if samples fail to be collected. "
"Default is True for macOS and False Linux."
),
default=SKIPEMPTY_DEFAULT,
)
return parser


Expand Down
199 changes: 159 additions & 40 deletions src/con_duct/duct_main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
from __future__ import annotations
import collections
from collections import Counter
from collections.abc import Iterable, Iterator
from dataclasses import asdict, dataclass, field
Expand All @@ -10,6 +11,7 @@
import logging
import math
import os
import platform
import re
import shutil
import signal
Expand All @@ -20,11 +22,22 @@
import threading
import time
from types import FrameType
from typing import IO, Any, Optional, TextIO
from typing import IO, Any, Callable, Optional, TextIO, Union
import warnings

__version__ = version("con-duct")
__schema_version__ = "0.2.2"

is_mac_intel = sys.platform == "darwin" and os.uname().machine == "x86_64"
if is_mac_intel and not os.getenv("DUCT_IGNORE_INTEL_WARNING"):
message = (
"Detected system macOS running on intel architecture - "
"duct may experience issues with sampling and signal handling.\n\n"
"Set the environment variable `DUCT_IGNORE_INTEL_WARNING` to suppress this warning.\n"
)
warnings.warn(message=message, stacklevel=2)
SYSTEM = platform.system()
SKIPEMPTY_DEFAULT = True if SYSTEM == "Darwin" else False

lgr = logging.getLogger("con-duct")
DEFAULT_LOG_LEVEL = os.environ.get("DUCT_LOG_LEVEL", "INFO").upper()
Expand Down Expand Up @@ -281,7 +294,15 @@ def add_pid(self, pid: int, stats: ProcessStats) -> None:
self.stats[pid] = stats
self.timestamp = max(self.timestamp, stats.timestamp)

def aggregate(self: Sample, other: Sample) -> Sample:
def aggregate(
self: Sample, other: Sample, skipempty: bool = SKIPEMPTY_DEFAULT
) -> Sample:
if skipempty and not other.stats and other.averages.num_samples == 0:
lgr.debug(
f"Other sample ({other=}) is empty during aggregation - returning the base sample."
)
return self

output = Sample()
for pid in self.stats.keys() | other.stats.keys():
if (mine := self.stats.get(pid)) is not None:
Expand Down Expand Up @@ -321,6 +342,128 @@ def for_json(self) -> dict[str, Any]:
return d


def _get_sample_linux(session_id: int) -> Sample:
sample = Sample()

ps_command = [
"ps",
"-w",
"-s",
str(session_id),
"-o",
"pid,pcpu,pmem,rss,vsz,etime,stat,cmd",
]
output = subprocess.check_output(ps_command, text=True)

for line in output.splitlines()[1:]:
if not line:
continue

pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split(maxsplit=7)

sample.add_pid(
pid=int(pid),
stats=ProcessStats(
pcpu=float(pcpu),
pmem=float(pmem),
rss=int(rss_kib) * 1024,
vsz=int(vsz_kib) * 1024,
timestamp=datetime.now().astimezone().isoformat(),
etime=etime,
stat=Counter([stat]),
cmd=cmd,
),
)
sample.averages = Averages.from_sample(sample=sample)
return sample


def _try_to_get_sid(pid: int) -> int:
"""
It is possible that the `pid` returned by the top `ps` call no longer exists at time of `getsid` request.
"""
try:
return os.getsid(pid)
except Exception as exc:
lgr.debug(f"Error fetching session ID for PID {pid}: {str(exc)}")
return -1


def _get_ps_lines_mac() -> list[str]:
ps_command = [
"ps",
"-ax",
"-o",
"pid,pcpu,pmem,rss,vsz,etime,stat,args",
]
output = subprocess.check_output(ps_command, text=True)

lines = [line for line in output.splitlines()[1:] if line]
return lines


def _add_sample_from_line_mac(
line: str, pid_to_matching_sid: dict[int, int], sample: Sample
) -> Union[Sample, None]:
pid, pcpu, pmem, rss_kb, vsz_kb, etime, stat, cmd = line.split(maxsplit=7)

if pid_to_matching_sid.get(int(pid), None) is None:
return None

sample.add_pid(
pid=int(pid),
stats=ProcessStats(
pcpu=float(pcpu),
pmem=float(pmem),
rss=int(rss_kb) * 1024,
vsz=int(vsz_kb) * 1024,
timestamp=datetime.now().astimezone().isoformat(),
etime=etime,
stat=Counter([stat]),
cmd=cmd,
),
)
return sample


def _get_sample_mac(session_id: int) -> Sample:
sample = Sample()

lines = _get_ps_lines_mac()
pid_to_matching_sid = {
pid: sid
for line in lines
if (sid := _try_to_get_sid(pid=(pid := int(line.split(maxsplit=1)[0]))))
== session_id
}

if not pid_to_matching_sid:
lgr.debug(f"No processes found for session ID {session_id}. ")

collections.deque(
(
_add_sample_from_line_mac(
line=line, pid_to_matching_sid=pid_to_matching_sid, sample=sample
)
for line in lines
),
maxlen=0,
)

try:
sample.averages = Averages.from_sample(sample=sample)
except AssertionError:
lgr.debug(f"Failed to compute averages for sample: {sample}")
return sample


_get_sample_per_system = {
"Linux": _get_sample_linux,
"Darwin": _get_sample_mac,
}
_get_sample: Callable[[int], Sample] = _get_sample_per_system[SYSTEM] # type: ignore[assignment]


Comment on lines +463 to +465
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will raise a KeyError at module import time if the code is run on an unsupported system (e.g., Windows). Consider adding error handling or a default fallback to provide a clear error message indicating which systems are supported.

Suggested change
_get_sample: Callable[[int], Sample] = _get_sample_per_system[SYSTEM] # type: ignore[assignment]
def _get_sample(session_id: int) -> Sample:
"""Return a Sample for the given session_id, dispatching based on the current system.
Raises a clear error on unsupported systems instead of failing with a KeyError
at import time.
"""
try:
backend = _get_sample_per_system[SYSTEM]
except KeyError as exc:
supported = ", ".join(sorted(_get_sample_per_system.keys()))
raise RuntimeError(
f"Unsupported system '{SYSTEM}'. Supported systems are: {supported}."
) from exc
return backend(session_id)

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving for @asmacdo decision - the current pattern was based on Yariks original suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I have no issue with a KeyError right now. I'll add Windows support in the new year

class Report:
"""Top level report"""

Expand Down Expand Up @@ -431,51 +574,24 @@ def get_system_info(self) -> None:

def collect_sample(self) -> Optional[Sample]:
assert self.session_id is not None
sample = Sample()
try:
output = subprocess.check_output(
[
"ps",
"-w",
"-s",
str(self.session_id),
"-o",
"pid,pcpu,pmem,rss,vsz,etime,stat,cmd",
],
text=True,
)
for line in output.splitlines()[1:]:
if line:
pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split(
maxsplit=7,
)
sample.add_pid(
int(pid),
ProcessStats(
pcpu=float(pcpu),
pmem=float(pmem),
rss=int(rss_kib) * 1024,
vsz=int(vsz_kib) * 1024,
timestamp=datetime.now().astimezone().isoformat(),
etime=etime,
stat=Counter([stat]),
cmd=cmd,
),
)
sample = _get_sample(self.session_id)
return sample
except subprocess.CalledProcessError as exc: # when session_id has no processes
lgr.debug("Error collecting sample: %s", str(exc))
return None

sample.averages = Averages.from_sample(sample)
return sample

def update_from_sample(self, sample: Sample) -> None:
self.full_run_stats = self.full_run_stats.aggregate(sample)
def update_from_sample(
self, sample: Sample, skipempty: bool = SKIPEMPTY_DEFAULT
) -> None:
self.full_run_stats = self.full_run_stats.aggregate(sample, skipempty=skipempty)
if self.current_sample is None:
self.current_sample = Sample().aggregate(sample)
self.current_sample = Sample().aggregate(sample, skipempty=skipempty)
else:
assert self.current_sample.averages is not None
self.current_sample = self.current_sample.aggregate(sample)
self.current_sample = self.current_sample.aggregate(
sample, skipempty=skipempty
)
assert self.current_sample is not None

def write_subreport(self) -> None:
Expand Down Expand Up @@ -698,6 +814,7 @@ def monitor_process(
report_interval: float,
sample_interval: float,
stop_event: threading.Event,
skipempty: bool = SKIPEMPTY_DEFAULT,
) -> None:
lgr.debug(
"Starting monitoring of the process %s on sample interval %f for report interval %f",
Expand All @@ -724,7 +841,7 @@ def monitor_process(
break
# process is still running, but we could not collect sample
continue
report.update_from_sample(sample)
report.update_from_sample(sample, skipempty=skipempty)
if (
report.start_time
and report.elapsed_time >= (report.number - 1) * report_interval
Expand Down Expand Up @@ -872,6 +989,7 @@ def execute(
colors: bool,
mode: SessionMode,
message: str = "",
skipempty: bool = SKIPEMPTY_DEFAULT,
) -> int:
"""A wrapper to execute a command, monitor and log the process details.

Expand Down Expand Up @@ -955,6 +1073,7 @@ def execute(
report_interval,
sample_interval,
stop_event,
skipempty,
]
monitoring_thread = threading.Thread(
target=monitor_process, args=monitoring_args
Expand Down
2 changes: 1 addition & 1 deletion test/duct_main/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def test_aggregation_sample_no_pids(mock_log_paths: mock.MagicMock) -> None:
# When there are no pids, finalization should be triggered because the exe is finished,
# so a Sample with no PIDs should never be passed to update_from_sample.
with pytest.raises(AssertionError):
report.update_from_sample(sample0)
report.update_from_sample(sample0, skipempty=False)


@mock.patch("con_duct.duct_main.LogPaths")
Expand Down
6 changes: 6 additions & 0 deletions test/duct_main/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations
import json
from pathlib import Path
import platform
import subprocess
import time
import pytest
from con_duct.duct_main import SUFFIXES

SYSTEM = platform.system()
TEST_SCRIPT_DIR = Path(__file__).parent.parent / "data"


Expand Down Expand Up @@ -167,6 +169,10 @@ def test_logging_levels(temp_output_dir: str) -> None:
assert "Exit Code:" in result.stderr

# Test --quiet flag - should suppress logging
# skip_flag = ""
# if SYSTEM == "Darwin":
# skip_flag = " --skipempty"

result_quiet = subprocess.run(
f"con-duct run --quiet --clobber -p {duct_prefix} sleep 0.1",
shell=True,
Expand Down
Loading
Loading