Skip to content

Commit cc67464

Browse files
authored
Add h5 streamed writes (#120)
1 parent 1de8eed commit cc67464

File tree

8 files changed

+829
-34
lines changed

8 files changed

+829
-34
lines changed

R/analyse-helpers.R

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,99 @@
264264

265265
df_out
266266
}
267+
268+
269+
#' Initialize an incremental writer for /results datasets
270+
#' @noRd
271+
.init_results_stream_writer <- function(write_results_name,
272+
write_results_file,
273+
n_rows,
274+
column_names,
275+
flush_every = 1000L,
276+
storage_mode = "double",
277+
compression_level = 4L) {
278+
if (is.null(write_results_name)) {
279+
return(NULL)
280+
}
281+
if (!is.character(write_results_name) || length(write_results_name) != 1L || write_results_name == "") {
282+
stop("write_results_name must be a non-empty character string when provided")
283+
}
284+
if (is.null(write_results_file) || !is.character(write_results_file) || length(write_results_file) != 1L) {
285+
stop("write_results_file must be a single character path when write_results_name is provided")
286+
}
287+
if (!is.numeric(flush_every) || length(flush_every) != 1L || flush_every <= 0) {
288+
stop("write_results_flush_every must be a positive integer")
289+
}
290+
flush_every <- as.integer(flush_every)
291+
292+
if (!file.exists(write_results_file)) {
293+
rhdf5::h5createFile(write_results_file)
294+
}
295+
296+
h5_write <- hdf5r::H5File$new(write_results_file, mode = "a")
297+
if (!h5_write$exists("results")) {
298+
h5_write$create_group("results")
299+
}
300+
results_grp <- h5_write$open("results")
301+
if (results_grp$exists(write_results_name)) {
302+
results_grp$link_delete(write_results_name)
303+
}
304+
results_grp$create_group(write_results_name)
305+
h5_write$close_all()
306+
307+
dataset_path <- paste0("results/", write_results_name, "/results_matrix")
308+
chunk_rows <- min(flush_every, n_rows)
309+
310+
rhdf5::h5createDataset(
311+
file = write_results_file,
312+
dataset = dataset_path,
313+
dims = c(n_rows, length(column_names)),
314+
storage.mode = storage_mode,
315+
chunk = c(chunk_rows, length(column_names)),
316+
level = as.integer(compression_level)
317+
)
318+
319+
list(
320+
file = write_results_file,
321+
dataset_path = dataset_path,
322+
names_path = paste0("results/", write_results_name, "/column_names"),
323+
column_names = as.character(column_names),
324+
n_cols = length(column_names),
325+
write_row_cursor = 1L
326+
)
327+
}
328+
329+
330+
#' Append one block to an incremental /results writer
331+
#' @noRd
332+
.results_stream_write_block <- function(writer, block_df) {
333+
if (is.null(writer)) {
334+
return(writer)
335+
}
336+
block <- as.matrix(block_df)
337+
row_idx <- writer$write_row_cursor:(writer$write_row_cursor + nrow(block) - 1L)
338+
rhdf5::h5write(
339+
obj = block,
340+
file = writer$file,
341+
name = writer$dataset_path,
342+
index = list(row_idx, seq_len(writer$n_cols))
343+
)
344+
writer$write_row_cursor <- writer$write_row_cursor + nrow(block)
345+
writer
346+
}
347+
348+
349+
#' Finalize an incremental /results writer
350+
#' @noRd
351+
.finalize_results_stream_writer <- function(writer) {
352+
if (is.null(writer)) {
353+
return(invisible(NULL))
354+
}
355+
rhdf5::h5write(
356+
obj = writer$column_names,
357+
file = writer$file,
358+
name = writer$names_path
359+
)
360+
rhdf5::h5closeAll()
361+
invisible(NULL)
362+
}

0 commit comments

Comments
 (0)