Skip to content

Introduce ecoli.library.xarray_emitter#414

Draft
ntfrgl wants to merge 1 commit into
CovertLab:masterfrom
vivarium-collective:xarray-emitter-pr
Draft

Introduce ecoli.library.xarray_emitter#414
ntfrgl wants to merge 1 commit into
CovertLab:masterfrom
vivarium-collective:xarray-emitter-pr

Conversation

@ntfrgl
Copy link
Copy Markdown

@ntfrgl ntfrgl commented Apr 30, 2026

Rationale

From the main module documentation:

XarrayEmitter is an Emitter similar to ParquetEmitter, but with a design optimized towards a different flavour of downstream applications:

  • ParquetEmitter is geared towards emitting a significant fraction of the simulator state, in a format that supports flexible sparse selections, data reductions and time series visualizations, as used in analysis scripts.

  • XarrayEmitter is intended for emitting only a pre-selected subset of statically shaped tensor variables, in a format that supports numerical algorithms in the high-dimensional and large-sample regime.

The former type of computations is naturally expressed using relational query engines (e.g., DuckDB), whereas the latter type is naturally expressed using array programming libraries (e.g., Cubed). Due to the sheer size of the simulator state, both types may in general require out-of-core processing algorithms.

In order to facilitate downstream applications based on chunked array processing, XarrayEmitter writes out to any persistent storage supporting the Zarr specification, using an in-memory buffer comprised of Xarray objects. For optimized throughput, the buffer implements temporal subsampling, numerical type casting and compression codecs at emission time. Furthermore, in order to simplify the export of simulation data into external libraries, the hierarchy of the output DataTree is decoupled from the hierarchy of simulation Stores, using an output variable layout specified in the simulator configuration.

Notes for reviewers

  • This implementation is informed by the prototypes in the branches zarr-emitter and xarray-emitter.
  • Before diving into the code, please read the hyperlinked documentation of the main module:
cd doc && uv run make clean html
$BROWSER _build/html/reference/api/ecoli/ecoli.library.xarray_emitter.html

@ntfrgl ntfrgl marked this pull request as draft April 30, 2026 18:57
@thalassemia
Copy link
Copy Markdown
Contributor

Thanks for the PR! I'll do a more thorough review later, but at a glance, one thing I'd like to see is a demo for how to read/work with the Zarr output. Some ad hoc code in a Marimo notebook would suffice. I just want to get a sense for what that would look like and maybe run some benchmarks.

Copy link
Copy Markdown
Contributor

@thalassemia thalassemia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some preliminary thoughts. I really appreciate the detailed docstrings! Ultimately, I ended up focusing on those and just skimming the code, which feels way beyond my paygrade. I had to look so much stuff up and learn a bunch of unfamiliar Python syntax but still couldn't really follow along haha.

My biggest questions are:

  1. What does analyzing this output will look like?
  2. How will this perform at scale in the cloud (read/write)?

"""
assert engine.emitter is self
if emit_paths:
state = self.ecoli_experiment.state
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be engine.state.

cls, "Invalid argument", f"\"predicate\": [{config}]"))
return cls(list(map(AtomicEmitPredicate.build, config)))

@abstractmethod
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude says the @abstractmethod decorator should be removed here and for ConjunctiveEmitPredicate.__call__.


@cached_property
def _warnings_eval_effect(self) -> list[WarningFilter]:
return self.warnings_make_effect()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude also caught this minor error: should be self.warnings_eval_effect().

`conjunctive normal form`_ (CNF), where literals are atomic predicates
parametrized by the JSON configuration.

.. _conjunctive normal form: https://en.wikipedia.org/wiki/Conjunctive_normal_form
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole emit predicate system is a really cool idea.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This will enable more complex statistical aggregations over simulation trajectories.

dtype: str
#: Unit string to store as an attribute inside the node, provided either
#: as a string, as a :py:class:`pint.Unit`, or as a :py:class:`unum.Unum`.
unit: str | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cool. Units have long been a pain point for us. I know Vivarium 2.0 addresses this somehow, but I also wanted to point out that it is possible to add custom key-value metadata to Parquet files (metadata option in Polars write_parquet and DuckDB parquet_kv_metadata).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here is merely a pragmatic user-facing solution that is configured in the emitter on a per-variable basis. The fundamental solution will be a globally consistent type system in the simulator framework that includes all variable units.


.. hint::
- The numbers of threads and concurrent connections in the transport layer
have backend-specific configuration options (see
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my limited testing, runtime write performance seems quite good locally. The output size from one generation run with configs/test_configs/test_xarray_emitter.json was also 50% smaller than that of an equivalent sim run with the Parquet emitter (20 MB vs 30 MB)!

Looking forward to testing write performance on HPC/cloud, as well as read and aggregation performance over large datasets.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stay tuned for an example aggregation script.

I encourage you to also experiment with variable-specific compression codecs on some variables for which you possess strong prior knowledge.

libraries, the hierarchy of the output `DataTree`_ is decoupled from the
hierarchy of simulation :py:class:`~vivarium.core.store.Store`\ s, using an
output :ref:`variable layout <variable_layout>` specified in the :ref:`simulator
configuration <sim_config>`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decoupling of simulation store hierarchy and output hierarchy is interesting. The flexibility is very nice, especially for stores very deeply nested in the sim hierarchy. I'm a bit worried that it will complicate analysis scripts, which up to now have operated under the assumption that data will be located in a fixed location (tied to the commit used to run the workflow and verified by our CI workflow via ecoli/analysis/multivariant/dummy.py).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design really reflects a difference is intended use cases. The decoupling of external from internal hierarchies may provide additional flexibility for "in-house" uses; however, it is primarily motivated as a way:

  • to reduce the overhead for "external" downstream users of simulation outputs, who should not need to be familiar with vivarium store hierarchies they are not concerned with,
  • and to facilitate future design efforts towards standardized high-performance emission formats.

Meanwhile, there is no a priori reason to port existing analysis scripts from ParquetEmitter to XarrayEmitter, unless somebody specifically decides that the potential increase in throughput is worth their time. In that scenario, tying the analysis script to the config JSON should still ensure reproducibility.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. Another thing that would help is for our eventual Xarray analyses to include in their docstring what emitter_arg has to look like to work.

- Supports *renaming and rearranging* of output variables.
- Requires the configuration of *output data types*.
- Supports the configuration of backend-specific *compression codecs*.
- Supports :ref:`log_updates`, i.e., the emission of individual
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this to remind myself why log_updates are not supported by the Parquet emitter.

One issue is that log_updates can contain Python objects not supported by Parquet. This should be solvable with an extra serialization step.

Another issue is that columns in the output of the Parquet emitter can only be included or removed at the level of stores. Each process has its own store under log_updates that holds its entire update dictionary, so, for each process, you can only choose to include every column in that update dictionary or none. I think I could adapt your system for specifying emit paths to fix this as well as the currently broken emit_paths option (mentioned in my comment on ecoli_master_sim.py).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! You should be able to directly reuse the logic in:

  • EmitPath.emitting_path
  • ForestView.matches_emitted_prefix_path()
  • XarrayBuffer.write()

`DataArray`_\ s within an output buffer.
- Decouples the in-memory buffer size from the persistent chunk size, in order
to simplify performance tuning of large-scale simulations (see
:py:class:`.XarrayTransducer` and :py:class:`.AsyncBufferWriter` for details).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know how Zarr implements incremental writes under the hood? I'm especially curious about performance on high-latency cloud object storage.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design rationale

The architectural design of concurrency control in the AsyncBufferWriter is documented here. The overall system will perform as well as these assumptions match the actual situation of the compute environment. In particular, if we treat the behavior of the Xarray and Zarr APIs as given, then the most important factors we can control are the following:

  • How much time a buffer write cycle has available to complete concurrently, before it starts blocking the next buffer (buffer size * sampling interval / compute time per simulation step).
  • How well we can reduce the amount of latency incurred by reducing the number of file system calls required (adequate chunk sizes, caching file handles and metadata)
  • How well we can mask per-array latency by writing asynchronously to multiple arrays (number of arrays per buffer, number of async Zarr operations).
  • How well we can leverage available total bandwidth by writing with multiple threads (number of Zarr worker threads).
  • Whether we avoid congesting the available network or file system bandwidth with too many connections from many parallel simulation processes.

Internals

And here is what Xarray and Zarr do under the hood, in their current implementations.

B. async setitem()

This is the simplified call stack, at the point in time when the low-level write operation for an individual array slice is sent to the Zarr API:

  1. .emitter.XarrayEmitter.flush()
  2. .writer.AsyncBufferWriter.write()
  3. .writer.AsyncBufferWriter._write()
  4. .writer.AsyncBufferWriter.eval_effect()
  5. .writer.AsyncArrayWriter.sync()
  6. .zarr_writer.AsyncZarrArrayWriter._async()
  7. zarr.AsyncArray.setitem

A. set_variables()

Before being sent to (B.7), the low-level write operations are first generated by the Xarray API, at the following simplified call stack:

  1. (as above)
  2. (as above)
  3. (as above)
  4. .zarr_writer.AsyncZarrBufferWriter.make_effect()
  5. .zarr_writer._datatree_to_zarr()
  6. xarray.backends.writers.dump_to_store()
  7. xarray.backends.zarr.ZarrStore.store()
  8. xarray.backends.zarr.ZarrStore.set_variables()
  9. .writer.AsyncArrayWriter.add()

In particular, (A.8) contains the Xarray logic for computing the indexing range for an appending write to an existing Zarr array. For a comparison, see Zarr's own "convenience function" zarr.core.array._append().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all useful information. However, my question was more specifically about how objects in most cloud storage providers are immutable and do not support operations like append. Usually, you have to rewrite the entire object to update it. That could be inefficient if you are writing multiple buffers per chunk.

Comment thread ecoli/library/xarray_emitter/__init__.py
@ntfrgl
Copy link
Copy Markdown
Author

ntfrgl commented May 1, 2026

I really appreciate the detailed docstrings! Ultimately, I ended up focusing on those and just skimming the code

Thank you for your initial review! It is already prompting several important fixes, and I am glad to hear that you see value in the new design draft.

but still couldn't really follow along haha

After I address your most urgent comments, I will be happy to go over implementation details with you. Answering any questions you might have -- be it via code changes, comments, docs or a traced conversation -- is likely to reduce very real future maintenance costs.


  1. What does analyzing this output will look like?

This PR is obviously focused on the write operations from the emitter. However, I expect to attach an example analysis script for a small Nextflow workflow sometime in the next 1-2 days, and we can start iterating on the more subtle questions from there. For now, test_xarray_emitter::StoreResult.__enter__ demonstrates the most basic way to load a simulation store, using either API:

  • The Zarr API is lower-level and specific to the Zarr format. It allows fine-grained control over async operations, metadata management etc., but it doesn't provide computing capabilities.
  • The Xarray API is higher-level, wraps around the Zarr API, and could be generalized to other storage backends in future work. Its DataTree data structure provides more high-level logic, such as the "inherited coordinates", and has built-in support for distributed computing libraries such as Dask or Cubed, both for loading and for processing data.

But fundamentally, both APIs are isomorphic to a Mapping[str, Mapping[str, numpy.ndarray]] bundled with many utility functions. Therefore, at a high level, this is how downstream usage will look like:

  • choose parameters for how to open the store (resulting in file handles and metadata caches),
  • choose relevant arrays within the store hierarchy (resulting in file handles and metadata caches),
  • load relevant slices (resulting in uncompressed in-memory dense arrays),
  • and then perform array computations on them.

  1. How will this perform at scale in the cloud (read/write)?

Write operations

The most performance-critical configuration parameters that you may want to investigate for benchmarking are:

  • transducer.buffer.size -- see notes in XarrayTransducer
  • writer.buffers_per_chunk -- see notes in AsyncBufferWriter
  • writer.backend_config.{async.concurrency,threading.max_workers} -- see notes in AsyncZarrBufferWriter
  • view[].variables.{}.codecs -- see notes in AsyncZarrBufferWriter.

In general, the optimal resource parameters will depend on the workload and the compute infrastructure. The main goal of this PR is to enable declarative fine-grained control over these resource parameters, while subordinating the emitter architecture to explicit assumptions about the workflow-level communication pattern, as described in the top-level documentation. In particular, these explicit assumptions are intended to closely reflect the current practice, but in a way that can be easily generalized in subsequent work -- this concerns your questions about the current restrictions to single lineages, 1-dimensional arrays etc. Hopefully, you will come to agree that with this architecture, such generalizations will require only small code modifications.

For the purpose of reviewing this PR, I suggest the following strategy:

  1. You raise any questions or concerns you have about the assumptions and choices in the emitter architecture, going top-down from the conceptual documentation to the implementation.
  2. We agree on a small set of scientifically representative benchmark workloads, and together run a few iterations of parameter tuning, based on your experience with end users and taking one of your clusters or cloud deployments as an example. If this leads to a handful of rules of thumb for how to tune the resource parameters, I would consider that a success.
  3. We agree on a list of desirable extensions that will be documented for future work.

Read operations

In practice, with either the Zarr API or the Xarray API, achieving good read performance for large workflows will require careful resource management of the compute graphs, analogously to how achieving good performance with DuckDB requires careful SQL design for nontrivial computations. This includes:

  • loading only the arrays that are strictly necessary for a computation,
  • streaming them with chunk sizes that are well aligned with the desired logical indexing and supported by the available working memory,
  • leveraging available parallel computing capabilities.

I will try to showcase several of the relevant API parameters as a starting point for your benchmarking and for future downstream applications. But fundamentally, this is a question of application-specific performance engineering, and hence, providing a general solution for analysis workloads is not in scope for this PR.

@thalassemia
Copy link
Copy Markdown
Contributor

I finally got around to fixing emit_paths here: #418. I ended up taking a much simpler approach by intentionally not supporting log_updates. Long story short, I realized that some of the most interesting process updates are in a format that Parquet cannot store anyways. For example, bulk molecule updates are lists of tuples that can have inconsistent levels of nesting (e.g., [([1,2], 300), (10, -10), (4, [-5])] is interpreted as "add 300 counts to molecule index 1 and 2, subtract 10 counts from index 10, and subtract 5 counts from index 4"). Given that, I decided it was not worth adding complexity to support a feature that would be very hit or miss in practice.

Do you know if Zarr supports data like the above? If not, instead of using log_updates, users would likely be better off creating new listeners to explicitly log any updates of interest. This allows them to format the data in a Parquet/Zarr-friendly format inside the process before sending it to the emitter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants