Skip to content

Conversation

@valerie-cal
Copy link

Summary

Implemented the function from_webdataset() in dataset.py that converts a webdataset .tar file into a Rivulet Dataset instance; created a series of unit tests for validation.

Rationale

from_webdataset() will allow Rivulet databases to hold WebDataset data, alongside other data formats such as JSON and CSV.

Changes

  • Implemented from_webdataset() in deltacat/storage/rivulet/dataset.py that reads from a given WebDataset and creates a Rivulet dataset from the JSON and image metadata
  • Added batch read functionality such that WebDataset is read in user provided batch_sizes
  • Handles WebDatasets containing assorted and nested datatypes
  • Uses merge keys to extract image metadata and store in binary for ease of integration of classification using HuggingFace datasets and models

Testing

Test suite deltacat/tests/storage/rivulet/schema/test_wds.py contain unit tests testing the following functionality of from_webdataset(). All test cases pass and from_webdataset() does not break existing Rivulet or deltacat functions.

  1. Test that Schema correctly stores Field objects with their types
  2. Test that from_webdataset correctly identifies all fields in the tar file
  3. Test that data values are correctly extracted from the tar file
  4. Test that merge keys are correctly identified and set in the schema
  5. Test that specifying a non-existent field as merge key raises an error
  6. Test that field datatypes are correctly inferred from the data
  7. Test that metadata directory is properly initialized
  8. Test that fields in the dataset are proper Field objects
  9. Test that from_webdataset correctly identifies all fields in the tar file if the jsons are inconsistent
  10. Test that image_binary is an added column after Dataset is created from webdataset

valerie-cal and others added 30 commits March 10, 2025 15:15
Copy link
Member

@pdames pdames left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... it seems like this PR includes a lot of superfluous/untouched files in the diff that are creating unnecessary conflicts. Maybe the changes can be squashed and rebased on top of the latest from https://github.com/ray-project/deltacat/tree/2.0 then resubmitted to clean this up?

from transformers import AutoImageProcessor, AutoModelForImageClassification


#tar_path = "deltacat/tests/test_utils/resources/imagenet1k-train-0000.tar"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove any unused code

Comment on lines 83 to 90
# Create a list of dictionaries combining filename and predicted species
rows_to_write = [
{
"filename": fname,
"bird_species": bird_labels[idx]
}
for idx, fname in enumerate(filenames)
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird way to do this, zip() the two would be cleaner.

Comment on lines 529 to 588
with tarfile.open(file_uri, "r") as tar:
tar_members = tar.getmembers()
current_batch = None
reading_frame_size = batch_size # TODO: Use batch size 1 for now.
total_batches = math.ceil(len(tar_members) / reading_frame_size)

for i in range(total_batches):
reading_frame_start = i * reading_frame_size
reading_frame_end = reading_frame_start + reading_frame_size
for member in tar_members[reading_frame_start:reading_frame_end]:
# Ignore hidden files if the imported tar isn't cleaned.
if member.name.startswith("._"):
continue
if member.isfile() and member.name.endswith(".json"):
f = tar.extractfile(member)
if f:
try:
merge_key = merge_keys

pyarrow_table = pyarrow.json.read_json(f)
image_filename = pyarrow_table[merge_key][0].as_py()

# truncated_filename = normalize_filename(image_filename[image_filename.index('/') + 1:])
truncated_filename = normalize_filename(os.path.basename(image_filename))
if truncated_filename in [normalize_filename(t.name) for t in tar_members]:
image_member = next((t for t in tar_members if t.name == truncated_filename), None)
if image_member:
fi = tar.extractfile(image_member)
if fi:
media_binary = fi.read()
media_binaries.extend([media_binary])

if current_batch is None:
current_batch = pyarrow_table
else:
current_batch = pa.concat_tables([current_batch, pyarrow_table])
except Exception as e:
print(f"Error with {member.name}:", e)

if current_batch is not None:
try:
dataset_schema.merge(Schema.from_pyarrow(current_batch.schema, merge_keys=merge_keys))
except Exception as e:
print(f"Error merging schema: {e}")

if current_batch is not None and media_binaries:
if len(media_binaries) == current_batch.num_rows:
try:
image_column = pyarrow.array(media_binaries, type=pyarrow.binary())
current_batch = current_batch.add_column(
len(current_batch.schema),
'media_binary',
image_column
)
# Edit dataset_schema to have media_binaries as a field object
dataset_schema.add_field(Field('media_binary', Datatype.binary(image_filename[image_filename.index('.') + 1:].lower())))
except Exception as e:
print(f"Mismatch between media binaries and batch rows: {e}")


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite heavily nested. Not worth it now, but I'd extract this into a WebDatasetReader sort of class that manges all this in a non-nested way if we do more development.

pyarrow_table = pyarrow.json.read_json(f)
image_filename = pyarrow_table[merge_key][0].as_py()

# truncated_filename = normalize_filename(image_filename[image_filename.index('/') + 1:])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This blows up if you have multiple merge keys. Need to fix.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have currently handled the following cases:

  1. If there are multiple merge keys, raise a ValueError
  2. If the merge key input is a list with one merge key, proceed with the one merge key.

We can adapt this to handle multiple merge keys instead of raising an error if wanted.

Comment on lines 1 to 13
@dataclass(frozen=True)
class Field:
name: str
datatype: Datatype
is_merge_key: bool = False

class Schema(MutableMapping[str, Field]):
def __init__(
self,
fields: Iterable[Tuple[str, Datatype] | Field] = None,
merge_keys: Optional[Iterable[str]] = None,
):
self._fields: Dict[str, Field] = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not understanding why a class called Field is in a file called schema_test?

Copy link
Contributor

@025rhu 025rhu Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file was just for our own purposes of understanding the classes and files, not for the PR, so we have removed it completely.

The Field and Schema classes were already classes defined in schema.py, but just for the sake of ease we copied them into this file just for our own understanding haha.

We've moved the process_tar() function into test_wds.py for now, just because it could be helpful test util, but we are not sure if that is the best place to put it (or if we even want to keep it).

Comment on lines 22 to 25
"""Test that from_webdataset correctly identifies all fields in the tar file."""
tar_path = "../../../test_utils/resources/test_wds.tar"
dataset = Dataset.from_webdataset(
name="test_webdataset",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally don't like static, pre-generated test objects like this. The class should generate the wds file prior to the test using a standard wds function, then test on the created file, then delete the file at the end. This ensures if wds library/standard changes and that change impacts serialization, the tests actually fail properly. Right now, there's no confidence the .tar is actually a real wds file, and no clear understanding of what's in that file or how it was generated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We looked into dynamically generating a webdataset, but it's a bit tricky in this case since typical webdatasets include media files like .jpg, which are not straightforward to create dynamically in a lightweight way. We could use .txt files, but that wouldn't fully test the expected use case.

Also, we noticed that other parts of the codebase (like CSV and Parquet) have some static test files, so we planned to follow that pattern by adding a minimal .tar file for testing. Happy to revisit this if there's a preferred way to generate valid webdataset test data inline.

Comment on lines 95 to 105
def test_metadata_directory_creation(tmp_path):
"""Test that metadata directory is properly initialized."""
tar_path = "../../../test_utils/resources/test_wds.tar"
dataset = Dataset.from_webdataset(
name="test_meta",
file_uri=tar_path,
metadata_uri=tmp_path,
merge_keys="filename"
)
assert hasattr(dataset, "_metadata_path")
assert dataset._metadata_path is not None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't test internal attributes like this. Remove this test and test whatever it is the _metadata_path is attempting to create (i.e. fetch some useful metdata that would require the metadata path dir to exist)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We created a test test_dataset_persistence_and_reloading which successfully creates, saves, and scans the dataset. We take this to mean that the metadata was successfully written and the path successfully exists, but we can also explore more comprehensive tests.

foo.py Outdated
Comment on lines 1 to 10
import csv
import pyarrow as pa
import pyarrow.compute as pc

animal = pa.array(["sheep", "cows", "horses", "foxes", "sheep"], type=pa.string())
count = pa.array([12, 5, 2, 1, 10], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022, 2021], type=pa.int16())

# Creating a table from arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File doesn't belong in commit?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've removed this file.

foowds.py Outdated
Comment on lines 1 to 16
import os
import json
import tarfile
import io
import numpy as np
from PIL import Image

# Create mock data directory
if not os.path.exists('mock_data'):
os.makedirs('mock_data')

# Sample IDs for medical papers (similar to the example)
sample_ids = [
"",
"PMC4129566_00003",
"PMC4872614_00002",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File doesn't belong in commit? Or this needs to move it to tests/utils or some equivalent so it can be used by the wds test class.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've removed this file.

Comment on lines 1 to 11
import itertools
import pytest
import pyarrow as pa
import json
import tarfile
from deltacat.storage.rivulet import Dataset, Schema, Field, Datatype


def test_schema_field_types():
"""Test that Schema correctly stores Field objects with their types."""
fields = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be missing an actual data read/write, all tests are about schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All our tests now use Pytest fixtures to create tar files at the beginning of each test. They each run from_webdataset() , so that should include verifying that reading from the tar file worked. Then each test checks the fields, types and some values in the Dataset produced by from_webdataset(), which I believe should handle checking that writing worked.

We put all test cases in a class called TestFromWebDataset, in order to ensure proper set up and teardown, leaving no directories lying around. This meant that we only used 1 temp_dir directory from Pytest's tmp_path, so we named datasets, tar files, and JSON files uniquely to their respective entities in lieu of that.

The one thing leftover is testing with image files (instead of .txt files), which we left a TODO and commented out code in for, and can address once we discuss more on the specifics of the matter!

@025rhu
Copy link
Contributor

025rhu commented Aug 17, 2025

To your comment @pdames (Hmm... it seems like this PR includes a lot of superfluous/untouched files in the diff that are creating unnecessary conflicts. Maybe the changes can be squashed and rebased on top of the latest from https://github.com/ray-project/deltacat/tree/2.0 then resubmitted to clean this up?):

It looks like we accidentally changed the executable mode of ~400 files in one commit (most likely by running chmod +x at some point).

We have merged the latest 2.0 into the PR (which unfortunately did not resolve the executable mode issue), and changed the executable modes back for all relevant files.

@025rhu
Copy link
Contributor

025rhu commented Aug 18, 2025

Overall changes:

  • merged in the latest 2.0
  • resolved issue with the extra ~400 files modified in this PR
  • error out if there are multiple merge keys (for now)
  • new, working test suite for from_webdataset() in test_wds.py; deleted old test suite
  • ran and passed linter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants