Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 184 additions & 11 deletions arrow-json/benches/json_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,28 @@ use std::fmt::Write;
use std::hint::black_box;
use std::sync::Arc;

// Shared
const ROWS: usize = 1 << 17; // 128K rows
const BATCH_SIZE: usize = 1 << 13; // 8K rows per batch

// Wide object / struct
const WIDE_FIELDS: usize = 64;
const BINARY_BYTES: usize = 64;
const WIDE_PROJECTION_TOTAL_FIELDS: usize = 100; // 100 fields total, select only 3
const LIST_SHORT_ELEMENTS: usize = 5;
const LIST_LONG_ELEMENTS: usize = 100;

// Binary
const BINARY_BYTES: usize = 64;

// List
const SHORT_LIST_ELEMENTS: usize = 5;
const LONG_LIST_ELEMENTS: usize = 100;

// Map
const SMALL_MAP_ENTRIES: usize = 5;
const LARGE_MAP_ENTRIES: usize = 50;

// Run-end encoded
const SHORT_REE_RUN_LENGTH: usize = 2;
const LONG_REE_RUN_LENGTH: usize = 100;

fn decode_and_flush(decoder: &mut Decoder, data: &[u8]) {
let mut offset = 0;
Expand Down Expand Up @@ -289,19 +303,174 @@ fn bench_decode_list(c: &mut Criterion) {
let schema = build_list_schema();

// Short lists: tests list handling overhead (few elements per row)
let short_data = build_list_json(ROWS, LIST_SHORT_ELEMENTS);
bench_decode_schema(c, "decode_list_short_i64_json", &short_data, schema.clone());
let short_data = build_list_json(ROWS, SHORT_LIST_ELEMENTS);
bench_decode_schema(c, "decode_short_list_i64_json", &short_data, schema.clone());

// Long lists: tests child element decode throughput (many elements per row)
let long_data = build_list_json(ROWS, LIST_LONG_ELEMENTS);
bench_decode_schema(c, "decode_list_long_i64_json", &long_data, schema);
let long_data = build_list_json(ROWS, LONG_LIST_ELEMENTS);
bench_decode_schema(c, "decode_long_list_i64_json", &long_data, schema);
}

fn bench_serialize_list(c: &mut Criterion) {
let schema = build_list_schema();

let short_values = build_list_values(ROWS, LIST_SHORT_ELEMENTS);
c.bench_function("decode_list_short_i64_serialize", |b| {
let short_values = build_list_values(ROWS, SHORT_LIST_ELEMENTS);
c.bench_function("decode_short_list_i64_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&short_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});

let long_values = build_list_values(ROWS, LONG_LIST_ELEMENTS);
c.bench_function("decode_long_list_i64_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&long_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
}

fn build_map_json(rows: usize, entries: usize) -> Vec<u8> {
let mut out = String::with_capacity(rows * (entries * 20 + 16));
for row in 0..rows {
out.push_str("{\"map\":{");
for i in 0..entries {
if i > 0 {
out.push(',');
}
write!(&mut out, "\"k{}\":{}", i, (row + i) as i64).unwrap();
}
out.push_str("}}\n");
}
out.into_bytes()
}

fn build_map_values(rows: usize, entries: usize) -> Vec<Value> {
let mut out = Vec::with_capacity(rows);
for row in 0..rows {
let mut inner = Map::with_capacity(entries);
for i in 0..entries {
inner.insert(
format!("k{i}"),
Value::Number(Number::from((row + i) as i64)),
);
}
let mut map = Map::with_capacity(1);
map.insert("map".to_string(), Value::Object(inner));
out.push(Value::Object(map));
}
out
}

fn build_map_schema() -> Arc<Schema> {
let entries_field = Arc::new(Field::new(
"entries",
DataType::Struct(
vec![
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Int64, true),
]
.into(),
),
false,
));
Arc::new(Schema::new(vec![Field::new(
"map",
DataType::Map(entries_field, false),
false,
)]))
}

fn bench_decode_map(c: &mut Criterion) {
let schema = build_map_schema();

let small_data = build_map_json(ROWS, SMALL_MAP_ENTRIES);
bench_decode_schema(c, "decode_small_map_json", &small_data, schema.clone());

let large_data = build_map_json(ROWS, LARGE_MAP_ENTRIES);
bench_decode_schema(c, "decode_large_map_json", &large_data, schema);
}

fn bench_serialize_map(c: &mut Criterion) {
let schema = build_map_schema();

let small_values = build_map_values(ROWS, SMALL_MAP_ENTRIES);
c.bench_function("decode_small_map_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&small_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});

let large_values = build_map_values(ROWS, LARGE_MAP_ENTRIES);
c.bench_function("decode_large_map_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&large_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
}

fn build_ree_json(rows: usize, run_length: usize) -> Vec<u8> {
let mut out = String::with_capacity(rows * 24);
for row in 0..rows {
let value = (row / run_length) as i64;
writeln!(&mut out, "{{\"val\":{value}}}").unwrap();
}
out.into_bytes()
}

fn build_ree_values(rows: usize, run_length: usize) -> Vec<Value> {
let mut out = Vec::with_capacity(rows);
for row in 0..rows {
let value = (row / run_length) as i64;
let mut map = Map::with_capacity(1);
map.insert("val".to_string(), Value::Number(Number::from(value)));
out.push(Value::Object(map));
}
out
}

fn build_ree_schema() -> Arc<Schema> {
let ree_type = DataType::RunEndEncoded(
Arc::new(Field::new("run_ends", DataType::Int32, false)),
Arc::new(Field::new("values", DataType::Int64, true)),
);
Arc::new(Schema::new(vec![Field::new("val", ree_type, false)]))
}

fn bench_decode_ree(c: &mut Criterion) {
let schema = build_ree_schema();

let short_data = build_ree_json(ROWS, SHORT_REE_RUN_LENGTH);
bench_decode_schema(c, "decode_short_ree_runs_json", &short_data, schema.clone());

let long_data = build_ree_json(ROWS, LONG_REE_RUN_LENGTH);
bench_decode_schema(c, "decode_long_ree_runs_json", &long_data, schema);
}

fn bench_serialize_ree(c: &mut Criterion) {
let schema = build_ree_schema();

let short_values = build_ree_values(ROWS, SHORT_REE_RUN_LENGTH);
c.bench_function("decode_short_ree_runs_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
Expand All @@ -312,8 +481,8 @@ fn bench_serialize_list(c: &mut Criterion) {
})
});

let long_values = build_list_values(ROWS, LIST_LONG_ELEMENTS);
c.bench_function("decode_list_long_i64_serialize", |b| {
let long_values = build_ree_values(ROWS, LONG_REE_RUN_LENGTH);
c.bench_function("decode_long_ree_runs_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
Expand Down Expand Up @@ -402,6 +571,10 @@ criterion_group!(
bench_wide_projection,
bench_decode_list,
bench_serialize_list,
bench_decode_map,
bench_serialize_map,
bench_decode_ree,
bench_serialize_ree,
bench_schema_inference
);
criterion_main!(benches);
Loading