Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions repype/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class StatusReaderConsoleAdapter(repype.status.StatusReader):
def __init__(self, *args, indent: int = 2, batch: Optional[repype.batch.Batch] = None, **kwargs):
self.indent = indent
self._intermediate_line_length = 0
self._progress_t0 = None
self.margin = None
self.batch = batch
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -141,7 +142,11 @@ def full_format(
Format the status update as a string,
including indentation and empty lines between blocks of different indentation.
"""
text = str(self.format(positions, status, intermediate))
try:
text = str(self.format(positions, status, intermediate))
except: # noqa: E722
Comment thread
kostrykin marked this conversation as resolved.
print(f'An error occurred while processing status file: {self.filepath}')
raise

# Compute indentation, and add an extra line if the margin changes
margin = ' ' * self.indent * (len(positions) - 1)
Expand Down Expand Up @@ -216,12 +221,12 @@ def format(
text = f'🔴 Batch run interrupted'

if status.get('info') == 'progress':
if status.get('step') == 0:
self.progress_t0 = time.time()
if status.get('step') == 0 or self._progress_t0 is None:
self._progress_t0 = time.time()
eta = ''
else:
progress_t1 = time.time()
speed = (progress_t1 - self.progress_t0) / status.get('step')
speed = (progress_t1 - self._progress_t0) / status.get('step')
eta = ', ETA: ' + format_hms(speed * (status.get('max_steps') - status.get('step')))
text = f'{100 * status.get("step") / status.get("max_steps"):.1f}% '\
f'({status.get("step")} / {status.get("max_steps")}{eta})'
Expand All @@ -232,10 +237,13 @@ def format(
if isinstance(details, dict):
details = self.format_progress_details(details)
text = f'{details} {text}'
else:
self._progress_t0 = None

return text if text else status

else:
self._progress_t0 = None
return status

Comment thread
kostrykin marked this conversation as resolved.
def format_progress_details(self, details: dict) -> str:
Expand Down
39 changes: 38 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ async def asyncSetUp(self):
await self.status_reader.__aenter__()

async def asyncTearDown(self):
await self.status_reader.__aexit__(None, None, None)
if self.status_reader is not None:
await self.status_reader.__aexit__(None, None, None)
self.tempdir.cleanup()

async def test(self):
Expand Down Expand Up @@ -169,6 +170,42 @@ async def test_error(self):
# Verify that there have been two iterations, i.e. `item_idx = 0`, `item_idx = 1`
self.assertEqual(item_idx, 1)

async def test_blocking(self):
# Shutdown the default status reader of the test case
await self.status_reader.__aexit__(None, None, None)
self.status_reader = None

# Suppress asyncio complaining about long-running co-routine to avoid cluttering the output
import logging
logging.getLogger('asyncio').setLevel(logging.ERROR)

lines = [
'[ ] 0.0% (0 / 3)\r',
'[====== ] 33.3% (1 / 3, ETA: 00:02)\r',
'[============= ] 66.7% (2 / 3, ETA: 00:01)\r',
' \r',
]
with testsuite.CaptureStdout() as stdout:
async with repype.cli.StatusReaderConsoleAdapter(self.status.filepath, blocking = True):
for item_idx, item in enumerate(repype.status.progress(self.status, range(3))):
time.sleep(1)
Comment thread
kostrykin marked this conversation as resolved.
self.assertEqual(str(stdout), ''.join(lines[:item_idx + 1]))

await test_status.wait_for_watchdog()
self.assertEqual(str(stdout), ''.join(lines))

async def test_delayed(self):
"""
Test that the status reader is delayed so that it misses the first step of the progress iteration.

This is a regression test for a race-condition.
"""
with testsuite.CaptureStdout():
for step, _ in enumerate(repype.status.progress(self.status, range(4))):
if step >= 2:
# Wait for the status reader to process the output, now that the first two steps have been missed
await test_status.wait_for_watchdog()


class ExtendedStatusReaderConsoleAdapter(repype.cli.StatusReaderConsoleAdapter):

Expand Down
13 changes: 10 additions & 3 deletions tests/testsuite.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ def __ini__(self, *args, **kwargs):
self.kwargs = kwargs


class CaptureStdout:
class CaptureOutput:

create_redirect = None

def __init__(self):
self.stdout_buf = io.StringIO()

def __enter__(self):
self.redirect = contextlib.redirect_stdout(self.stdout_buf)
self.redirect = self.create_redirect(self.stdout_buf)
self.redirect.__enter__()
return self

Expand All @@ -144,4 +146,9 @@ def __exit__(self, exc_type, exc_value, traceback):
print(str(self), file = sys.stderr)

def __str__(self):
return re.sub(r'\033\[K', '', self.stdout_buf.getvalue())
return re.sub(r'\033\[K', '', self.stdout_buf.getvalue())


class CaptureStdout(CaptureOutput):

create_redirect = contextlib.redirect_stdout