feat: seekable reader#530
Conversation
There was a problem hiding this comment.
Pull request overview
Adds seek support for Avro container readers by tracking block boundaries (offset + record count) during iteration and exposing an API to seek back to a previously-seen block for Read + Seek inputs.
Changes:
- Introduced
BlockPositionand internal position tracking to record block start offsets as blocks are loaded. - Added
Reader::{current_block,data_start,seek_to_block}(seek API gated onSeek) plus tests validating seeking between blocks. - Added a new error detail variant for seek failures.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
avro/src/reader/mod.rs |
Exposes BlockPosition and adds reader-level block position + seek APIs with new tests. |
avro/src/reader/block.rs |
Implements BlockPosition, PositionTracker, and block-level seek + block-boundary tracking. |
avro/src/lib.rs |
Re-exports BlockPosition from the crate root. |
avro/src/error.rs |
Adds Details::SeekToBlock for I/O seek errors. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Do you have a use case for this functionality ? |
| let n = self.inner.read(buf)?; | ||
| self.pos += n as u64; | ||
| Ok(n) | ||
| } |
There was a problem hiding this comment.
The implementation assumes that read_buf(), read_exact(), read_vectored(), ... use their default impls and delegate to read(). But if they are overwritten then the tracking breaks.
There was a problem hiding this comment.
Is there any common examples for this behavior?
There was a problem hiding this comment.
The tracking is done by wrapping the reader in a PositionTracker right? So it will always go through PositionTracker::read.
There was a problem hiding this comment.
The problem I see is:
- imagine some custom Read implementation that overrides any of the provided methods (all Read methods but
read()) for any reason. For example let's say the custom Read impl has a custom version ofread_exact() - PositionTracker is not opt-in and thus any usage of Read::read_exact() in the Avro codebase will always use PositionTracker::read_exact(). This will use the default impl (https://doc.rust-lang.org/stable/src/std/io/mod.rs.html#1044-1046) that will delegate to PositionTracker::read(). It will delegate to inner::read() and increment the read bytes (
pos). - So, the custom Read impl read_exact() won't be used at all and the user application has nothing to do
PositionTracker either needs to be smarter or opt-in.
|
|
||
| self.current_block_info = Some(BlockPosition { | ||
| offset: block_start, | ||
| message_count: block_len as usize, |
There was a problem hiding this comment.
| message_count: block_len as usize, | |
| message_count: self.message_count, |
There was a problem hiding this comment.
I actually wrote this part specifically this way to ensure it would not lost, if a refactoring or other change are applied. It cannot rely on the meaning of the self.message_count and its current value if those two will be separated into different places of code
There was a problem hiding this comment.
You can move self.message_count = block_len as usize down next to self.current_block_info to reduce that risk. As I do think Martin's suggestion is reasonable.
There was a problem hiding this comment.
I'm not arguing, any of this would work and I can change, that's no problem. But I'm curios to understand why you think this would be better?
My idea is just using the source of truth variable, so there is no way it gets a wrong number. Using the self.message_count works with the current code, but it doesn't give any guarantee if it changes. So I'm wondering why the second approach is better?
I need to read just a few records from a large Avro file, and without this it's incredibly inefficient as I need to read the whole file from the start each time. |
|
I'm wondering if you guys expect me to make some changes to the PR or we're waiting for something else? |
|
I don't have further comments, @martin-g how about you? |
| let n = self.inner.read(buf)?; | ||
| self.pos += n as u64; | ||
| Ok(n) | ||
| } |
There was a problem hiding this comment.
The problem I see is:
- imagine some custom Read implementation that overrides any of the provided methods (all Read methods but
read()) for any reason. For example let's say the custom Read impl has a custom version ofread_exact() - PositionTracker is not opt-in and thus any usage of Read::read_exact() in the Avro codebase will always use PositionTracker::read_exact(). This will use the default impl (https://doc.rust-lang.org/stable/src/std/io/mod.rs.html#1044-1046) that will delegate to PositionTracker::read(). It will delegate to inner::read() and increment the read bytes (
pos). - So, the custom Read impl read_exact() won't be used at all and the user application has nothing to do
PositionTracker either needs to be smarter or opt-in.
| /// Typically the caller saves offsets from [`current_block`](Self::current_block) | ||
| /// during forward iteration and later passes them here to jump back. | ||
| pub fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { | ||
| let seek_status = self.block.seek_to_block(offset); |
There was a problem hiding this comment.
This logic has the side effect of resetting the errored flag.
If errored was set to true earlier, e.g. a call to Reader::next() then a call to Reader::seek_to_block(offset) that successfully reads the block at that offset will reset the errored to false. Now the user application can again call Reader::next() (or any other method that checks errored before doing any work).
| pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { | ||
| self.reader | ||
| .seek(SeekFrom::Start(offset)) | ||
| .map_err(Details::SeekToBlock)?; |
There was a problem hiding this comment.
It would be safer to reset the state first (lines 368-371).
Currently if SeekToBlock is returned then the state won't be reset and if Block::read_block_next() is used it will fail at
avro-rs/avro/src/reader/block.rs
Line 198 in a6c7563
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| pub struct BlockPosition { | ||
| /// Byte offset in the stream where this block starts (before the object-count varint). | ||
| pub offset: u64, |
There was a problem hiding this comment.
Since BlockPosition is public I'd recommend to make the fields private, add constructor and getters. This way it will be easier to add more fields later if needed without API breaks.
| let mut reader = Reader::new(Cursor::new(&data))?; | ||
| let result = reader.seek_to_block(7); | ||
| assert!(result.is_err()); | ||
|
|
There was a problem hiding this comment.
Please extend this test with seek to EOF and beyond:
| let eof = data.len() as u64; | |
| assert!(reader.seek_to_block(eof).is_err()); | |
| assert!(reader.seek_to_block(eof + 1).is_err()); |
| // and replace `buf` with the new one, instead of reusing the same buffer. | ||
| // We can address this by using some "limited read" type to decode directly | ||
| // into the buffer. But this is fine, for now. | ||
| self.codec.decompress(&mut self.buf) |
There was a problem hiding this comment.
self.current_block_info will be set even if the decompression fails.
And Reader::current_block() will see it.
AFAIU you need to read the Avro data once to collect the offsets to be able to seek later, right ? |
Added a feature to seek to a particular block when reading an Avro file.
The Reader now provides the current Block position and for a Seek'able input it can seek to the specific position, assuming it's a valid start position of a block.