Regional#111
Conversation
|
/gcbrun |
There was a problem hiding this comment.
Code Review
This pull request introduces a BackgroundPrefetcher to gcsfs to optimize sequential reads by asynchronously fetching data blocks in a background task. It refactors _cat_file to support concurrent fetching, integrates the prefetcher into GCSFile, and adds a suite of new tests. Feedback focuses on improving maintainability and robustness by replacing magic numbers with named constants, narrowing exception handling from BaseException or broad Exception types to more specific ones, and ensuring correct boolean parsing for environment variables. Additionally, the reviewer suggests avoiding blocking the event loop during initialization and refactoring complex conditional logic in the prefetcher's producer loop for better clarity.
| default_block_size = DEFAULT_BLOCK_SIZE | ||
| protocol = "gs", "gcs" | ||
| async_impl = True | ||
| MIN_CHUNK_SIZE_FOR_CONCURRENCY = 5 * 1024 * 1024 |
There was a problem hiding this comment.
The value 5 * 1024 * 1024 is a magic number. It would be more readable and maintainable to define this as a named constant, perhaps at the module level or as a class constant with a descriptive name, to improve clarity and allow for easier modification if the threshold needs to change in the future.
| MIN_CHUNK_SIZE_FOR_CONCURRENCY = 5 * 1024 * 1024 | |
| MIN_CHUNK_SIZE_FOR_CONCURRENCY = 5 * 1024 * 1024 # Consider defining this as a named constant for clarity |
| # for start=5, end=5), causing the server to return the whole file instead of nothing. | ||
| if start is not None and end is not None and start >= end >= 0: | ||
| return b"" | ||
|
|
There was a problem hiding this comment.
The comment # 'if start or end' fails when start=0 or end=0 because 0 is Falsey. was removed from the previous version of this line. This comment provided valuable context regarding Python's truthiness and how it affects if conditions with start or end values of 0. Restoring it would improve code clarity for future maintainers.
| # 'if start or end' fails when start=0 or end=0 because 0 is Falsey. | |
| u2 = self.url(path) |
| try: | ||
| results = await asyncio.gather(*tasks) | ||
| return b"".join(results) | ||
| except BaseException as e: |
There was a problem hiding this comment.
Catching BaseException is generally too broad as it can mask unexpected system-level errors, including KeyboardInterrupt or SystemExit. It's best practice to catch more specific exceptions (e.g., Exception or a custom exception type) to ensure that only anticipated errors are handled, allowing other critical issues to propagate.
| except BaseException as e: | |
| except Exception as e: |
| # disrupted. This will be refactored once the upstream `fsspec` changes are merged. | ||
| use_prefetch_reader = kwargs.get( | ||
| "use_prefetch_reader", False | ||
| ) or os.environ.get("use_prefetch_reader", False) |
There was a problem hiding this comment.
Environment variables are always strings. Comparing os.environ.get("use_prefetch_reader", False) directly with a boolean False might not yield the expected result if the environment variable is set to a string like "0", "false", or "True". It's safer to explicitly convert the environment variable's string value to a boolean (e.g., str(value).lower() == 'true').
| ) or os.environ.get("use_prefetch_reader", False) | |
| ) or str(os.environ.get("use_prefetch_reader", "false")).lower() == "true" |
| ) or os.environ.get("use_prefetch_reader", False) | ||
| if use_prefetch_reader: | ||
| max_prefetch_size = kwargs.get("max_prefetch_size", None) | ||
| concurrency = kwargs.get("concurrency", 4) |
There was a problem hiding this comment.
The default value 4 for concurrency is a magic number. It would be more maintainable to define this as a named constant, allowing for easier updates and better readability.
| concurrency = kwargs.get("concurrency", 4) | |
| concurrency = kwargs.get("concurrency", 4) # Consider using a named constant for the default |
| if prefetch_size >= self.MIN_CHUNK_SIZE: | ||
| if prefetch_space_available >= self.MIN_CHUNK_SIZE: | ||
| actual_size = min( | ||
| max(self.MIN_CHUNK_SIZE, block_size), | ||
| space_remaining, | ||
| ) | ||
| else: | ||
| break | ||
| else: | ||
| actual_size = min(block_size, space_remaining) |
| logger.debug("Producer loop cancelled") | ||
| pass | ||
| except Exception as e: | ||
| logger.error("Producer loop encountered an error: %s", e, exc_info=True) |
There was a problem hiding this comment.
Catching Exception is generally too broad as it can mask unexpected errors. It's best practice to catch more specific exceptions to ensure that only anticipated errors are handled, allowing other critical issues to propagate.
| logger.error("Producer loop encountered an error: %s", e, exc_info=True) | |
| except Exception as e: # Consider catching more specific exceptions |
| except asyncio.CancelledError: | ||
| logger.debug("Read task was cancelled.") | ||
| raise | ||
| except Exception as e: |
There was a problem hiding this comment.
Catching Exception is generally too broad as it can mask unexpected errors. It's best practice to catch more specific exceptions to ensure that only anticipated errors are handled, allowing other critical issues to propagate.
| except Exception as e: | |
| except Exception as e: # Consider catching more specific exceptions |
| try: | ||
| result = fsspec.asyn.sync(self.loop, self._async_fetch, start, end) | ||
| except Exception as e: | ||
| logger.error("Exception during synchronous fetch: %s", e, exc_info=True) |
There was a problem hiding this comment.
Catching Exception is generally too broad as it can mask unexpected errors. It's best practice to catch more specific exceptions to ensure that only anticipated errors are handled, allowing other critical issues to propagate.
| logger.error("Exception during synchronous fetch: %s", e, exc_info=True) | |
| except Exception as e: # Consider catching more specific exceptions |
| assert mock_seq.call_count == 1 | ||
| assert mock_conc.call_count == 0 | ||
|
|
||
| # 2. Concurrency = 4, but read size (1MB) is < MIN_CHUNK_SIZE_FOR_CONCURRENCY (5MB) |
There was a problem hiding this comment.
The value 5MB is hardcoded here. It would be more robust to reference the MIN_CHUNK_SIZE_FOR_CONCURRENCY constant from gcsfs.core to ensure consistency and prevent test failures if the constant's value changes.
| # 2. Concurrency = 4, but read size (1MB) is < MIN_CHUNK_SIZE_FOR_CONCURRENCY (5MB) | |
| # 2. Concurrency = 4, but read size (1MB) is < gcs.MIN_CHUNK_SIZE_FOR_CONCURRENCY |
e747193 to
32f2056
Compare
Codecov Report❌ Patch coverage is
... and 1 file with indirect coverage changes 🚀 New features to boost your workflow:
|
72d3596 to
b5d348e
Compare
temporary created for perf check