diff --git a/repype/cli.py b/repype/cli.py index fc9949e..c874583 100644 --- a/repype/cli.py +++ b/repype/cli.py @@ -89,6 +89,7 @@ class StatusReaderConsoleAdapter(repype.status.StatusReader): def __init__(self, *args, indent: int = 2, batch: Optional[repype.batch.Batch] = None, **kwargs): self.indent = indent self._intermediate_line_length = 0 + self._progress_t0 = None self.margin = None self.batch = batch super().__init__(*args, **kwargs) @@ -141,7 +142,11 @@ def full_format( Format the status update as a string, including indentation and empty lines between blocks of different indentation. """ - text = str(self.format(positions, status, intermediate)) + try: + text = str(self.format(positions, status, intermediate)) + except: # noqa: E722 + print(f'An error occurred while processing status file: {self.filepath}') + raise # Compute indentation, and add an extra line if the margin changes margin = ' ' * self.indent * (len(positions) - 1) @@ -216,12 +221,12 @@ def format( text = f'🔴 Batch run interrupted' if status.get('info') == 'progress': - if status.get('step') == 0: - self.progress_t0 = time.time() + if status.get('step') == 0 or self._progress_t0 is None: + self._progress_t0 = time.time() eta = '' else: progress_t1 = time.time() - speed = (progress_t1 - self.progress_t0) / status.get('step') + speed = (progress_t1 - self._progress_t0) / status.get('step') eta = ', ETA: ' + format_hms(speed * (status.get('max_steps') - status.get('step'))) text = f'{100 * status.get("step") / status.get("max_steps"):.1f}% '\ f'({status.get("step")} / {status.get("max_steps")}{eta})' @@ -232,10 +237,13 @@ def format( if isinstance(details, dict): details = self.format_progress_details(details) text = f'{details} {text}' + else: + self._progress_t0 = None return text if text else status else: + self._progress_t0 = None return status def format_progress_details(self, details: dict) -> str: diff --git a/tests/test_cli.py b/tests/test_cli.py index 3e14a73..17f4d50 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -73,7 +73,8 @@ async def asyncSetUp(self): await self.status_reader.__aenter__() async def asyncTearDown(self): - await self.status_reader.__aexit__(None, None, None) + if self.status_reader is not None: + await self.status_reader.__aexit__(None, None, None) self.tempdir.cleanup() async def test(self): @@ -169,6 +170,42 @@ async def test_error(self): # Verify that there have been two iterations, i.e. `item_idx = 0`, `item_idx = 1` self.assertEqual(item_idx, 1) + async def test_blocking(self): + # Shutdown the default status reader of the test case + await self.status_reader.__aexit__(None, None, None) + self.status_reader = None + + # Suppress asyncio complaining about long-running co-routine to avoid cluttering the output + import logging + logging.getLogger('asyncio').setLevel(logging.ERROR) + + lines = [ + '[ ] 0.0% (0 / 3)\r', + '[====== ] 33.3% (1 / 3, ETA: 00:02)\r', + '[============= ] 66.7% (2 / 3, ETA: 00:01)\r', + ' \r', + ] + with testsuite.CaptureStdout() as stdout: + async with repype.cli.StatusReaderConsoleAdapter(self.status.filepath, blocking = True): + for item_idx, item in enumerate(repype.status.progress(self.status, range(3))): + time.sleep(1) + self.assertEqual(str(stdout), ''.join(lines[:item_idx + 1])) + + await test_status.wait_for_watchdog() + self.assertEqual(str(stdout), ''.join(lines)) + + async def test_delayed(self): + """ + Test that the status reader is delayed so that it misses the first step of the progress iteration. + + This is a regression test for a race-condition. + """ + with testsuite.CaptureStdout(): + for step, _ in enumerate(repype.status.progress(self.status, range(4))): + if step >= 2: + # Wait for the status reader to process the output, now that the first two steps have been missed + await test_status.wait_for_watchdog() + class ExtendedStatusReaderConsoleAdapter(repype.cli.StatusReaderConsoleAdapter): diff --git a/tests/testsuite.py b/tests/testsuite.py index 8a4a4da..18c538e 100644 --- a/tests/testsuite.py +++ b/tests/testsuite.py @@ -128,13 +128,15 @@ def __ini__(self, *args, **kwargs): self.kwargs = kwargs -class CaptureStdout: +class CaptureOutput: + + create_redirect = None def __init__(self): self.stdout_buf = io.StringIO() def __enter__(self): - self.redirect = contextlib.redirect_stdout(self.stdout_buf) + self.redirect = self.create_redirect(self.stdout_buf) self.redirect.__enter__() return self @@ -144,4 +146,9 @@ def __exit__(self, exc_type, exc_value, traceback): print(str(self), file = sys.stderr) def __str__(self): - return re.sub(r'\033\[K', '', self.stdout_buf.getvalue()) \ No newline at end of file + return re.sub(r'\033\[K', '', self.stdout_buf.getvalue()) + + +class CaptureStdout(CaptureOutput): + + create_redirect = contextlib.redirect_stdout