Skip to content

Commit c655272

Browse files
authored
fix: canonicalize VarBin/FSST with >2GB buffers (#5961)
Fixes a potential bug where decoding a large FSST and VarBin arrays results in an invalid VarBinViewArray. When you have a large buffer that is, currently we generate a new VBV with the single buffer plus some views built against it. There will be trouble if the buffer is > 2GiB though. This PR splits out a separate `build_views` function that takes a `max_buffer_len` parameter and as it generates views, it splits (zero-copy) the underlying buffer into segments of no more than `max_buffer_len`. --------- Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent 5483037 commit c655272

6 files changed

Lines changed: 146 additions & 59 deletions

File tree

encodings/fsst/src/array.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,9 @@ impl VTable for FSSTVTable {
188188

189189
// Decompress the whole block of data into a new buffer, and create some views
190190
// from it instead.
191-
let (buffer, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;
191+
let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;
192192

193-
builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask());
193+
builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask());
194194
Ok(())
195195
}
196196

encodings/fsst/src/canonical.rs

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use vortex_array::Canonical;
77
use vortex_array::ExecutionCtx;
88
use vortex_array::arrays::PrimitiveArray;
99
use vortex_array::arrays::VarBinViewArray;
10+
use vortex_array::arrays::build_views::MAX_BUFFER_LEN;
11+
use vortex_array::arrays::build_views::build_views;
1012
use vortex_array::vtable::ValidityHelper;
1113
use vortex_buffer::Buffer;
12-
use vortex_buffer::BufferMut;
1314
use vortex_buffer::ByteBuffer;
1415
use vortex_buffer::ByteBufferMut;
1516
use vortex_dtype::match_each_integer_ptype;
@@ -22,28 +23,24 @@ pub(super) fn canonicalize_fsst(
2223
array: &FSSTArray,
2324
ctx: &mut ExecutionCtx,
2425
) -> VortexResult<Canonical> {
25-
let (buffer, views) = fsst_decode_views(array, 0, ctx)?;
26+
let (buffers, views) = fsst_decode_views(array, 0, ctx)?;
2627
// SAFETY: FSST already validates the bytes for binary/UTF-8. We build views directly on
2728
// top of them, so the view pointers will all be valid.
2829
Ok(unsafe {
2930
Canonical::VarBinView(VarBinViewArray::new_unchecked(
3031
views,
31-
Arc::new([buffer]),
32+
Arc::from(buffers),
3233
array.dtype().clone(),
3334
array.codes().validity().clone(),
3435
))
3536
})
3637
}
3738

38-
#[expect(
39-
clippy::cast_possible_truncation,
40-
reason = "truncation is intentional for buffer index"
41-
)]
42-
pub(super) fn fsst_decode_views(
39+
pub(crate) fn fsst_decode_views(
4340
fsst_array: &FSSTArray,
44-
buf_index: u32,
41+
start_buf_index: u32,
4542
ctx: &mut ExecutionCtx,
46-
) -> VortexResult<(ByteBuffer, Buffer<BinaryView>)> {
43+
) -> VortexResult<(Vec<ByteBuffer>, Buffer<BinaryView>)> {
4744
// FSSTArray has two child arrays:
4845
// 1. A VarBinArray, which holds the string heap of the compressed codes.
4946
// 2. An uncompressed_lengths primitive array, storing the length of each original
@@ -58,7 +55,6 @@ pub(super) fn fsst_decode_views(
5855
.clone()
5956
.execute::<PrimitiveArray>(ctx)?;
6057

61-
// Decompress the full dataset.
6258
#[allow(clippy::cast_possible_truncation)]
6359
let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
6460
uncompressed_lens_array
@@ -76,24 +72,14 @@ pub(super) fn fsst_decode_views(
7672
unsafe { uncompressed_bytes.set_len(len) };
7773

7874
// Directly create the binary views.
79-
let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
80-
8175
match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
82-
let mut offset = 0;
83-
for len in uncompressed_lens_array.as_slice::<P>() {
84-
let len = *len as usize;
85-
let view = BinaryView::make_view(
86-
&uncompressed_bytes[offset..][..len],
87-
buf_index,
88-
offset as u32,
89-
);
90-
// SAFETY: we reserved the right capacity beforehand
91-
unsafe { views.push_unchecked(view) };
92-
offset += len;
93-
}
94-
});
95-
96-
Ok((uncompressed_bytes.freeze(), views.freeze()))
76+
Ok(build_views(
77+
start_buf_index,
78+
MAX_BUFFER_LEN,
79+
uncompressed_bytes,
80+
uncompressed_lens_array.as_slice::<P>(),
81+
))
82+
})
9783
}
9884

9985
#[cfg(test)]

vortex-array/src/arrays/varbin/vtable/canonical.rs

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@
33

44
use std::sync::Arc;
55

6-
use num_traits::AsPrimitive;
7-
use vortex_buffer::Buffer;
8-
use vortex_buffer::BufferMut;
96
use vortex_dtype::match_each_integer_ptype;
107
use vortex_error::VortexResult;
11-
use vortex_vector::binaryview::BinaryView;
128

139
use crate::Canonical;
1410
use crate::ExecutionCtx;
1511
use crate::arrays::PrimitiveArray;
1612
use crate::arrays::VarBinViewArray;
13+
use crate::arrays::build_views::MAX_BUFFER_LEN;
14+
use crate::arrays::build_views::build_views;
15+
use crate::arrays::build_views::offsets_to_lengths;
1716
use crate::arrays::varbin::VarBinArray;
1817

1918
/// Converts a VarBinArray to its canonical form (VarBinViewArray).
@@ -27,29 +26,21 @@ pub(crate) fn varbin_to_canonical(
2726
let array = array.clone().zero_offsets();
2827
let (dtype, bytes, offsets, validity) = array.into_parts();
2928

29+
// offsets_to_lengths
3030
let offsets = offsets.execute::<PrimitiveArray>(ctx)?;
31+
let bytes = bytes.into_mut();
3132

32-
// Build views directly from offsets
33-
#[expect(clippy::cast_possible_truncation, reason = "BinaryView offset is u32")]
34-
let views: Buffer<BinaryView> = match_each_integer_ptype!(offsets.ptype(), |O| {
35-
let offsets_slice = offsets.as_slice::<O>();
36-
let bytes_slice = bytes.as_ref();
37-
38-
let mut views = BufferMut::<BinaryView>::with_capacity(offsets_slice.len() - 1);
39-
for window in offsets_slice.windows(2) {
40-
let start: usize = window[0].as_();
41-
let end: usize = window[1].as_();
42-
let value = &bytes_slice[start..end];
43-
views.push(BinaryView::make_view(value, 0, start as u32));
44-
}
45-
views.freeze()
46-
});
47-
48-
// Create VarBinViewArray with the original bytes buffer and computed views
49-
// SAFETY: views are correctly computed from valid offsets
50-
let varbinview =
51-
unsafe { VarBinViewArray::new_unchecked(views, Arc::from([bytes]), dtype, validity) };
52-
Ok(Canonical::VarBinView(varbinview))
33+
match_each_integer_ptype!(offsets.ptype(), |P| {
34+
let lens = offsets_to_lengths(offsets.as_slice::<P>());
35+
let (buffers, views) = build_views(0, MAX_BUFFER_LEN, bytes, lens.as_slice());
36+
37+
let varbinview =
38+
unsafe { VarBinViewArray::new_unchecked(views, Arc::from(buffers), dtype, validity) };
39+
40+
// Create VarBinViewArray with the original bytes buffer and computed views
41+
// SAFETY: views are correctly computed from valid offsets
42+
Ok(Canonical::VarBinView(varbinview))
43+
})
5344
}
5445

5546
#[cfg(test)]
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use itertools::Itertools;
5+
use num_traits::AsPrimitive;
6+
use vortex_buffer::Buffer;
7+
use vortex_buffer::BufferMut;
8+
use vortex_buffer::ByteBuffer;
9+
use vortex_buffer::ByteBufferMut;
10+
use vortex_dtype::NativePType;
11+
use vortex_vector::binaryview::BinaryView;
12+
13+
/// Convert an offsets buffer to a buffer of element lengths.
14+
#[inline]
15+
pub fn offsets_to_lengths<P: NativePType>(offsets: &[P]) -> Buffer<P> {
16+
offsets
17+
.iter()
18+
.tuple_windows::<(_, _)>()
19+
.map(|(&start, &end)| end - start)
20+
.collect()
21+
}
22+
23+
/// Maximum number of buffer bytes that can be referenced by a single `BinaryView`
24+
pub const MAX_BUFFER_LEN: usize = i32::MAX as usize;
25+
26+
/// Split a large buffer of input `bytes` holding string data
27+
pub fn build_views<P: NativePType + AsPrimitive<usize>>(
28+
start_buf_index: u32,
29+
max_buffer_len: usize,
30+
mut bytes: ByteBufferMut,
31+
lens: &[P],
32+
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
33+
let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());
34+
35+
let mut buffers = Vec::new();
36+
let mut buf_index = start_buf_index;
37+
38+
let mut offset = 0;
39+
for &len in lens {
40+
let len = len.as_();
41+
assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len");
42+
43+
if (offset + len) > max_buffer_len {
44+
// Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field
45+
let rest = bytes.split_off(offset);
46+
47+
buffers.push(bytes.freeze());
48+
buf_index += 1;
49+
offset = 0;
50+
51+
bytes = rest;
52+
}
53+
let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_());
54+
// SAFETY: we reserved the right capacity beforehand
55+
unsafe { views.push_unchecked(view) };
56+
offset += len;
57+
}
58+
59+
if !bytes.is_empty() {
60+
buffers.push(bytes.freeze());
61+
}
62+
63+
(buffers, views.freeze())
64+
}
65+
66+
#[cfg(test)]
67+
mod tests {
68+
use vortex_buffer::ByteBuffer;
69+
use vortex_buffer::ByteBufferMut;
70+
use vortex_vector::binaryview::BinaryView;
71+
72+
use crate::arrays::build_views::build_views;
73+
74+
#[test]
75+
fn test_to_canonical_large() {
76+
// We are testing generating views for raw data that should look like
77+
//
78+
// aaaaaaaaaaaaa ("a"*13)
79+
// bbbbbbbbbbbbb ("b"*13)
80+
// ccccccccccccc ("c"*13)
81+
// ddddddddddddd ("d"*13)
82+
//
83+
// In real code, this would all fit in one buffer, but to unit test the splitting logic
84+
// we split buffers at length 26, which should result in two buffers for the output array.
85+
let raw_data =
86+
ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd");
87+
let lens = vec![13u8; 4];
88+
89+
let (buffers, views) = build_views(0, 26, raw_data, &lens);
90+
91+
assert_eq!(
92+
buffers,
93+
vec![
94+
ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"),
95+
ByteBuffer::copy_from("cccccccccccccddddddddddddd"),
96+
]
97+
);
98+
99+
assert_eq!(
100+
views.as_slice(),
101+
&[
102+
BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0),
103+
BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13),
104+
BinaryView::make_view(b"ccccccccccccc", 1, 0),
105+
BinaryView::make_view(b"ddddddddddddd", 1, 13),
106+
]
107+
)
108+
}
109+
}

vortex-array/src/arrays/varbinview/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ mod compute;
1313
mod vtable;
1414
pub use vtable::VarBinViewVTable;
1515

16+
pub mod build_views;
1617
#[cfg(test)]
1718
mod tests;

vortex-array/src/builders/varbinview.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ impl VarBinViewBuilder {
150150

151151
/// Pushes buffers and pre-adjusted views into the builder.
152152
///
153-
/// The provided `buffer` slices contain sections of data from a `VarBinViewArray`, and the
153+
/// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
154154
/// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
155155
/// indices and offsets for this builder. All views must point to valid sections within the
156156
/// provided buffers, and the validity length must match the view length.
@@ -166,14 +166,14 @@ impl VarBinViewBuilder {
166166
/// exist in this builder.
167167
pub fn push_buffer_and_adjusted_views(
168168
&mut self,
169-
buffer: &[ByteBuffer],
169+
buffers: &[ByteBuffer],
170170
views: &Buffer<BinaryView>,
171171
validity_mask: Mask,
172172
) {
173173
self.flush_in_progress();
174174

175-
let expected_completed_len = self.completed.len() as usize + buffer.len();
176-
self.completed.extend_from_slice_unchecked(buffer);
175+
let expected_completed_len = self.completed.len() as usize + buffers.len();
176+
self.completed.extend_from_slice_unchecked(buffers);
177177
assert_eq!(
178178
self.completed.len() as usize,
179179
expected_completed_len,

0 commit comments

Comments
 (0)