diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index e6bd6afe..2efa8f3a 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -26,8 +26,8 @@ jobs: fail-fast: false matrix: os: - # - macos-12 - # - macos-latest + #- macos-15-intel + - macos-latest # - windows-latest - ubuntu-latest python-version: @@ -65,11 +65,14 @@ jobs: run: | python -m pip install --upgrade pip wheel python -m pip install --upgrade --upgrade-strategy=eager tox - sudo apt-get install -y libjpeg-dev + + - name: Install system dependencies + if: matrix.os == 'ubuntu-latest' + run: sudo apt-get install -y libjpeg-dev - name: Run tests with coverage if: matrix.toxenv == 'py' - run: tox -e py -- -vv --cov-report=xml + run: tox -e py -- -vv -s --cov-report=xml - name: Run generic tests if: matrix.toxenv != 'py' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7c2df3d2..bfe62fbb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,7 +19,7 @@ repos: hooks: - id: isort - repo: https://github.com/PyCQA/flake8 - rev: 7.0.0 + rev: 7.3.0 hooks: - id: flake8 additional_dependencies: diff --git a/pyproject.toml b/pyproject.toml index ba534785..692b63b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,3 +8,8 @@ build-backend = "setuptools.build_meta" [tool.versioningit.write] file = "src/con_duct/_version.py" + +[tool.pytest.ini_options] +markers = [ + "flaky: mark a test as being unreliable" +] diff --git a/setup.cfg b/setup.cfg index 5d469fc9..67e454dc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,6 +40,8 @@ classifiers = Intended Audience :: Science/Research Intended Audience :: System Administrators Topic :: System :: Systems Administration + Operating System :: Unix + Operating System :: MacOS project_urls = Source Code = https://github.com/con/duct/ @@ -58,6 +60,7 @@ where = src [options.extras_require] all = matplotlib + numpy<2.0; implementation_name == "pypy" PyYAML pyout # Pin rpds-py to support PyPy 3.10 - https://github.com/con/duct/issues/330 diff --git a/src/con_duct/__main__.py b/src/con_duct/__main__.py index 464e6503..a3fde2de 100755 --- a/src/con_duct/__main__.py +++ b/src/con_duct/__main__.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from __future__ import annotations import argparse +import collections from collections import Counter from collections.abc import Iterable, Iterator from dataclasses import asdict, dataclass, field @@ -11,6 +12,7 @@ import logging import math import os +import platform import re import shutil import signal @@ -22,13 +24,23 @@ import threading import time from types import FrameType -from typing import IO, Any, Optional, TextIO +from typing import IO, Any, Callable, Optional, TextIO, Union +import warnings __version__ = version("con-duct") __schema_version__ = "0.2.2" +is_mac_intel = sys.platform == "darwin" and os.uname().machine == "x86_64" +if is_mac_intel and not os.getenv("DUCT_IGNORE_INTEL_WARNING"): + message = ( + "Detected system macOS running on intel architecture - " + "duct may experience issues with sampling and signal handling.\n\n" + "Set the environment variable `DUCT_IGNORE_INTEL_WARNING` to suppress this warning.\n" + ) + warnings.warn(message=message, stacklevel=2) lgr = logging.getLogger("con-duct") +SYSTEM = platform.system() DEFAULT_LOG_LEVEL = os.environ.get("DUCT_LOG_LEVEL", "INFO").upper() DUCT_OUTPUT_PREFIX = os.getenv( @@ -97,6 +109,13 @@ def _fill_text(self, text: str, width: int, _indent: str) -> str: def assert_num(*values: Any) -> None: for value in values: + if not isinstance(value, (float, int)): + lgr.error( + "Expected numeric value (float or int), got %s: %r among %r", + type(value).__name__, + value, + values, + ) assert isinstance(value, (float, int)) @@ -359,6 +378,152 @@ def for_json(self) -> dict[str, Any]: return d +def _get_sample_linux(session_id: int) -> Sample: + sample = Sample() + + ps_command = [ + "ps", + "-w", + "-s", + str(session_id), + "-o", + "pid,pcpu,pmem,rss,vsz,etime,stat,cmd", + ] + output = subprocess.check_output(ps_command, text=True) + + for line in output.splitlines()[1:]: + if not line: + continue + + pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split(maxsplit=7) + + sample.add_pid( + pid=int(pid), + stats=ProcessStats( + pcpu=float(pcpu), + pmem=float(pmem), + rss=int(rss_kib) * 1024, + vsz=int(vsz_kib) * 1024, + timestamp=datetime.now().astimezone().isoformat(), + etime=etime, + stat=Counter([stat]), + cmd=cmd, + ), + ) + sample.averages = Averages.from_sample(sample=sample) + return sample + + +def _try_to_get_sid(pid: int) -> int: + """ + It is possible that the `pid` returned by the top `ps` call no longer exists at time of `getsid` request. + """ + try: + return os.getsid(pid) + except Exception as exc: + lgr.debug(f"Error fetching session ID for PID {pid}: {str(exc)}") + return -1 + + +def _get_ps_lines_mac() -> list[str]: + ps_command = [ + "ps", + "-ax", + "-o", + "pid,pcpu,pmem,rss,vsz,etime,stat,args", + ] + output = subprocess.check_output(ps_command, text=True) + + lines = [line for line in output.splitlines()[1:] if line] + return lines + + +def _add_sample_from_line_mac( + line: str, pid_to_matching_sid: dict[int, int], sample: Sample +) -> Union[Sample, None]: # Return is not actually used here; just making linter happy + pid, pcpu, pmem, rss_kb, vsz_kb, etime, stat, cmd = line.split(maxsplit=7) + + if pid_to_matching_sid.get(int(pid), None) is None: + return None + + sample.add_pid( + pid=int(pid), + stats=ProcessStats( + pcpu=float(pcpu), + pmem=float(pmem), + rss=int(rss_kb) * 1024, + vsz=int(vsz_kb) * 1024, + timestamp=datetime.now().astimezone().isoformat(), + etime=etime, + stat=Counter([stat]), + cmd=cmd, + ), + ) + return sample + + +def _get_sample_mac(session_id: int, max_retries: int = 5) -> Sample: + sample = Sample() + + # If no matching SIDs found, trigger retry + counter = 0 + lines = [] + pid_to_matching_sid = {} + while counter < max_retries: + lines = _get_ps_lines_mac() + pid_to_matching_sid = { + pid: sid + for line in lines + if (sid := _try_to_get_sid(pid=(pid := int(line.split(maxsplit=1)[0])))) + == session_id + } + + if pid_to_matching_sid: + break + + counter += 1 + lgr.debug( + f"No processes found for session ID {session_id}. " + f"Retry attempt {counter}/{max_retries}." + ) + time.sleep( + 0.001 + ) # TODO: allow passing update interval and make this a fraction of that + + if counter == max_retries: + message = ( + "Failed to find processes for session ID " + f"{session_id} after {max_retries} attempts." + ) + lgr.error(msg=message) + raise RuntimeError(message) + + collections.deque( + ( + _add_sample_from_line_mac( + line=line, pid_to_matching_sid=pid_to_matching_sid, sample=sample + ) + for line in lines + ), + maxlen=0, + ) + + try: + sample.averages = Averages.from_sample(sample=sample) + except AssertionError: + message = f"Failed to compute averages for sample: {sample}" + lgr.error(msg=message) + raise RuntimeError(message) + return sample + + +_get_sample_per_system = { + "Linux": _get_sample_linux, + "Darwin": _get_sample_mac, +} +_get_sample: Callable[[int], Sample] = _get_sample_per_system[SYSTEM] # type: ignore[assignment] + + class Report: """Top level report""" @@ -469,44 +634,14 @@ def get_system_info(self) -> None: def collect_sample(self) -> Optional[Sample]: assert self.session_id is not None - sample = Sample() + try: - output = subprocess.check_output( - [ - "ps", - "-w", - "-s", - str(self.session_id), - "-o", - "pid,pcpu,pmem,rss,vsz,etime,stat,cmd", - ], - text=True, - ) - for line in output.splitlines()[1:]: - if line: - pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split( - maxsplit=7, - ) - sample.add_pid( - int(pid), - ProcessStats( - pcpu=float(pcpu), - pmem=float(pmem), - rss=int(rss_kib) * 1024, - vsz=int(vsz_kib) * 1024, - timestamp=datetime.now().astimezone().isoformat(), - etime=etime, - stat=Counter([stat]), - cmd=cmd, - ), - ) + sample = _get_sample(self.session_id) + return sample except subprocess.CalledProcessError as exc: # when session_id has no processes lgr.debug("Error collecting sample: %s", str(exc)) return None - sample.averages = Averages.from_sample(sample) - return sample - def update_from_sample(self, sample: Sample) -> None: self.full_run_stats = self.full_run_stats.aggregate(sample) if self.current_sample is None: diff --git a/test/test_arg_parsing.py b/test/test_arg_parsing.py index a90fd3ce..dbc3cbd6 100644 --- a/test/test_arg_parsing.py +++ b/test/test_arg_parsing.py @@ -1,4 +1,5 @@ import os +import platform import re import subprocess from unittest import mock @@ -27,6 +28,10 @@ def test_con_duct_version() -> None: assert re.match(r"con-duct \d+\.\d+\.\d+", output_str) +@pytest.mark.skipif( + condition=platform.system() != "Linux", + reason="Test is specific to Linux platforms.", +) def test_cmd_help() -> None: out = subprocess.check_output(["duct", "ps", "--help"]) assert "ps [options]" in str(out) @@ -62,7 +67,11 @@ def test_duct_missing_cmd() -> None: ) -def test_abreviation_disabled() -> None: +@pytest.mark.skipif( + condition=platform.system() != "Linux", + reason="Test is specific to Linux platforms.", +) +def test_abbreviation_disabled() -> None: """ If abbreviation is enabled, options passed to command (not duct) are still filtered through the argparse and causes problems. diff --git a/test/test_e2e.py b/test/test_e2e.py index 4568bebf..43ae0aaf 100644 --- a/test/test_e2e.py +++ b/test/test_e2e.py @@ -19,7 +19,7 @@ def test_sanity(temp_output_dir: str) -> None: def test_spawn_children(temp_output_dir: str, mode: str, num_children: int) -> None: duct_prefix = f"{temp_output_dir}log_" script_path = TEST_SCRIPT_DIR / "spawn_children.sh" - dur = "0.3" + dur = "3" command = f"duct -q --s-i 0.001 --r-i 0.01 -p {duct_prefix} {script_path} {mode} {num_children} {dur}" subprocess.check_output(command, shell=True) @@ -33,6 +33,7 @@ def test_spawn_children(temp_output_dir: str, mode: str, num_children: int) -> N for pid, proc in sample["processes"].items() if "sleep" in proc["cmd"] ) + # Add one pid for the hold-the-door process, see spawn_children.sh line 7 if mode == "setsid": assert len(all_child_pids) == 1 diff --git a/test/test_execution.py b/test/test_execution.py index 697ba60d..51d6833f 100644 --- a/test/test_execution.py +++ b/test/test_execution.py @@ -37,16 +37,16 @@ def test_sanity_green(caplog: pytest.LogCaptureFixture, temp_output_dir: str) -> t0 = time() exit_code = 0 assert execute(args) == exit_code - assert time() - t0 < 0.4 # we should not wait for a sample or report interval + assert time() - t0 < 0.6 # we should not wait for a sample or report interval assert_expected_files(temp_output_dir) assert "Exit Code: 0" in caplog.records[-1].message def test_execution_summary(temp_output_dir: str) -> None: args = Arguments.from_argv( - ["sleep", "0.1"], - sample_interval=0.05, # small enough to ensure we collect at least 1 sample - report_interval=0.1, + ["sleep", "1"], + sample_interval=0.5, # small enough to ensure we collect at least 1 sample + report_interval=1.0, output_prefix=temp_output_dir, ) assert execute(args) == 0