Skip to content

feat: add literature pyspark steps#113

Draft
vivienho wants to merge 78 commits into
mainfrom
vh-add-literature
Draft

feat: add literature pyspark steps#113
vivienho wants to merge 78 commits into
mainfrom
vh-add-literature

Conversation

@vivienho
Copy link
Copy Markdown
Contributor

No description provided.

d0choa and others added 27 commits April 30, 2026 14:12
PEP 701 (reusing the outer quote inside an f-string expression) is only supported on Python 3.12+. The project pins requires-python = '>=3.11,<3.14', so on 3.11 these expressions raise SyntaxError at import time. Switch the inner quote style to double quotes to keep the literals valid across the supported range.
match_mapped.df is consumed by match_disambiguated (which feeds match_valid and one branch of match_failed) and again directly by the isMapped==False branch of match_failed. Without persist, the load -> extract_matches -> map_labels lineage is recomputed for each branch and disambiguate is recomputed twice. Persist match_mapped.df before fan-out and unpersist after both writes complete.
match_failed is the union of two filters: rows with isMapped==False from match_mapped and rows with isValid==False from match_disambiguated. If disambiguate() does not strictly drop rows where isMapped is False, a row that failed mapping can satisfy both filters and end up duplicated in the failed output. Add a defensive isMapped==True filter on the disambiguation branch so the union is disjoint by construction, regardless of disambiguate()'s contract.
Replace explicit '== True' / '== False' comparisons with the boolean column directly and its negation. The original form trips ruff E712, and pulls in a NULL-handling pitfall (NULL == True is NULL, not False, so the row is silently dropped from both branches of the valid/failed split). Direct truthiness on a boolean Column has the same evaluation semantics for non-NULL values and is the idiomatic PySpark form.
The new literature_* pyspark steps import from the literature package, but no such dependency was declared. Add it as a git source so uv can resolve the import. Also map the ot-literature distribution name to the literature module name for deptry. The branch pin is temporary (see PR description); flip to vh-restructure-datasets once ot-literature#7 lands.
Per review preference, pin against ot-literature dev rather than the temporary do/bump-pyspark-3.5.7 branch. uv.lock is intentionally not regenerated here: dev currently has pyspark==3.3.4, which conflicts with pts pyspark==3.5.7, so resolution fails. Once the pyspark bump (ot-literature#7) and the match_mapped restructuring land on dev, re-run uv lock to refresh the lockfile against the dev SHA.
d0choa added 25 commits May 15, 2026 15:38
…packages check

Two changes for the next literature run:

* Force BroadcastHashJoin on small-LUT joins.

  The OnToma map_entities left join (matches × label LUT) was almost
  certainly running as SortMergeJoin in run-008: the LUT artifact is
  ~16MB compressed parquet, decoded ~60-100MB in memory, well above
  Spark's default 10MB autoBroadcastJoinThreshold. SMJ on a
  billions-row matches df is shuffle-heavy.

  Bump cluster-level spark.sql.autoBroadcastJoinThreshold to 512MB so
  Catalyst picks BHJ for the LUT and any other small-LUT joins
  downstream (cooccurrence_evidence).

* Satisfy OnToma's spark.jars.packages check via step properties.

  ontoma/ontoma.py:55 runs spark.conf.get("spark.jars.packages") and
  raises ValueError if the value doesn't contain "spark-nlp". With
  spark-nlp loaded from pre-staged JARs via spark.jars, that string
  is empty. Set spark.jars.packages on the per-step SparkConf — it
  goes in post-SparkSubmit, so Ivy resolution is NOT re-triggered;
  the value is purely informational for OnToma's assertion.

Applies to literature_ontoma_lut_generation and literature_publication_match
(the two steps that instantiate OnToma).

Also flips DATE_PREFIX to "2025" for the next eval run; full-EPMC sweep is
deferred until performance characteristics are validated at smaller scale.
Pin opentargets/OnToma to perf/avoid-distinct-collect-in-mapping-compatibility
(PR #51) to validate the _check_mapping_compatibility performance fix at
scale before it lands in a release. Revert to PyPI once the PR is merged
and released.
PR opentargets/OnToma#51 was force-pushed to replace the head(1) perf
tweak with a full removal of _check_mapping_compatibility. Refresh the
lockfile so the next Dataproc install picks up the new commit.
…y_lut

`_compute_relevance` collapses matches to one row per (pmid, section,
keywordId) before aggregating across sections. The previous step 1
computed `collect_list(weight)` via a window function partitioned on
those three keys, replicating the result to every row in the group,
then deduplicated via `dropDuplicates`. The window form materialises
the aggregate N times for an N-row group only to throw away N-1 copies.

Replace it with a direct `groupBy(pmid, section, keywordId).agg(...)`.
Semantics are identical: title sections still emit the fixed `[1.0]`
vector via a post-agg `withColumn`, other sections still collect all
mention weights into an array. `first()` over the publication metadata
columns (`pmcid`, `date`, `year`, `month`, `day`, `keywordType`) matches
the previous `dropDuplicates` non-determinism — those columns are
constant within a group in practice.

Also drops the dead `.orderBy('rank')` on the broadcast section-rank
table (broadcast joins don't preserve order; the downstream
`sort_array` ranks the structs anyway) and the now-unused `Window`
import.

Output schema and column order unchanged.
… commit

* Persist the publication dataframe at the end of _read_publications.

  run-010 audit showed Spark re-scanning the EPMC source jsonl files
  multiple times: every independent action in the downstream pipeline
  (write match_valid, write match_failed) walks the lineage back to
  the source read. Cumulative re-read cost on the 2025-prefix run was
  several minutes; it scales linearly with the EPMC source size.

  Persist after the final repartition (.repartition('pmid').persist()).
  Spark caches the materialised dataset on the first action and the
  subsequent writes reuse it. Caller unpersists at the end of the
  step.

* Bump ontoma pin from e5851b6 to 0671191 — picks up the label-dedup
  before NLP pipeline commit on opentargets/OnToma#51 (drops both the
  _check_mapping_compatibility scan AND the per-row NLP work, the
  latter being the biggest single win on map_entities for the
  publication_match step at scale).
Eval runs that only need to measure the first N steps no longer need a
one-shot edit to the launcher. Set STOP_AFTER=<step-name> in the
environment to stop the launcher after that step's wait_job returns;
the cluster is left running so Spark/YARN UIs stay reachable.
Picks up the third commit on opentargets/OnToma#51 — salting the
label-dedup distinct shuffle so hot labels (cancer, common gene
symbols) spread across SALT_BUCKETS=50 partitions instead of
funnelling into a single skewed task.

run-011 stage 91 measured 14× task-duration skew (one task at 158s
on 1.4 GB of shuffle while the p99 task handled 309 MB). At
full-corpus scale that skewed task is the dominant bottleneck of
publication_match.
Run-011 confirmed the 2025-prefix sizing at REPARTITION=8000,
SHUFFLE_PUBMATCH=5000 holds per-task data around 100-300 MB during
publication_match's heaviest shuffles. Full EPMC adds ~25× more
publications and ~10-15× more matches; without bumping the partition
counts, per-task shuffle size grows proportionally and pushes
executors into memory pressure.

Bump the tunables to a sensible full-EPMC starting point. Per-task
shuffle stays in the same 100-300 MB band as run-011:

  REPARTITION       8000  → 25000
  SHUFFLE_PUBMATCH  5000  → 15000
  SHUFFLE_EMBEDDING 5000  → 15000
  SHUFFLE_COOC      5000  → 15000
  SHUFFLE_ENTITY    2500  → 10000

Also flips DATE_PREFIX to '' so the launcher reads every EPMC day
folder.
…reshold

Two diagnostics from run-012 (full-EPMC, killed early due to severe
slowdown):

* Per-file task skew on EPMC reads.

  Default spark.sql.files.maxPartitionBytes (128MB) lets multi-GB
  EPMC jsonl files run as a single task. On the full corpus the per-
  task duration distribution on stages 5/6/10/12/14 was p50≈2s, p99
  in the 500-1000s range, max up to 17min — driven entirely by a
  handful of oversized day-folder files. Most tasks finished in
  seconds while a few stragglers blocked stage completion.

  Set spark.sql.files.maxPartitionBytes=32MB on publication_match so
  Spark subdivides large files; the downstream repartition(25000)
  handles any remaining imbalance.

* Drop the cluster-level autoBroadcastJoinThreshold from 512MB to
  128MB.

  512MB was sized so the ontoma LUT (~80MB compressed, ~80MB in
  memory) would qualify for BHJ. At full-EPMC scale, additional
  dataframes became broadcast candidates — most importantly
  pub_id_lut (~500MB uncompressed, right at the threshold). The
  ensuing 512MB broadcasts caused executor-side TaskMemoryManager
  page-allocation failures (`Failed to allocate a page (536870912
  bytes)`) and cascading RPC timeouts that killed 6 fresh executors.
  128MB still covers the ontoma LUT but excludes pub_id_lut and
  other accidentally-eligible dataframes.
…_valid write

ot-literature pin
  Point [tool.uv.sources].ot-literature at the perf/salt-disambig-join
  branch (opentargets/ot-literature PR #10). Salts the disambig left join
  in _resolve_ambiguous_mappings to spread hot (pmid, mappedId) keys,
  addressing the 56x task-duration skew observed on stage 142 in
  run-013 (max=452s, p50=8s).

publication_match AQE coalesce
  spark.sql.adaptive.advisoryPartitionSizeInBytes = 256MB
  spark.sql.adaptive.coalescePartitions.parallelismFirst = false

  Targets the small-file output at match_valid.write: run-013 wrote
  ~15000 files at ~7MB each because the disambig shuffle's skewed
  distribution prevented AQE from coalescing around hot partitions
  (AQE coalesces adjacent small partitions only -- a hot partition
  next to a small one blocks the coalesce). With the salt fix
  flattening the distribution, AQE can now consolidate post-shuffle
  partitions toward the new 256MB target. parallelismFirst=false
  is required so AQE coalesces below default parallelism for the
  write tail where extra parallelism isn't useful.

  Expected outcome: ~1500 files at ~256MB each for match_valid.
AQE coalesce did not consolidate the publication_match writes in run-014
despite the new advisoryPartitionSizeInBytes/parallelismFirst settings
taking effect on upstream stages (observed: 15000 -> 1496 and 15000 -> 124
task drops in mid-pipeline stages). The most likely cause is the explicit
match_disambiguated.df.persist() between the disambig shuffle and the
filter+write, which freezes the post-shuffle partitioning before AQE has
a chance to coalesce the consuming stage.

Run-014 wrote 15000 match_valid files at ~10 MB each (148 GB total) and
30000 match_failed files at ~4 MB each (130 GB total) -- both far below
any reasonable per-file target.

Adds a `_maybe_coalesce` helper paralleling the existing
`_maybe_repartition`, applied at each parquet write boundary and driven
by two new settings:

  match_valid_coalesce: target partition count for the valid write
  match_failed_coalesce: target partition count for the failed write

Falsy values leave the dataframe unchanged. Default values set in
launch_literature.sh target ~250 MB per parquet file at the observed
run-014 output volumes:

  COALESCE_MATCH_VALID="600"   # 148 GB / 600 ~= 250 MB / file
  COALESCE_MATCH_FAILED="530"  # 130 GB / 530 ~= 250 MB / file

The AQE coalesce properties stay in place: they help downstream
intermediate stages even though they don't reach the final write.
Run-015 failed at 10:08:38 with a wave of BlockNotFoundException for
broadcast pieces broadcast_51_piece0..4 / broadcast_53_piece4 in stage 36.
Tracing back, at 10:08:08 the Dataproc autoscaler decommissioned ~40
executors in a single decision ("Executor decommission finished: spark
scale down (31.0s) - Migration: NNNN/NNNN blocks, 0 deleted"). The
block migration covers shuffle blocks but NOT broadcast pieces, so 30
seconds later the next stage's broadcast fetches hit the now-dead
executors and the stage was aborted.

The downscale itself was the autoscaler reacting to Spark dynamic
allocation releasing idle executors between stage boundaries. With
default settings (executorIdleTimeout=60s) Spark releases executors
quickly, YARN sees idle vCores, and the autoscaler downsizes.

Three Spark settings on publication_match keep all executors alive for
the duration of the step:
  - spark.dynamicAllocation.executorIdleTimeout: 7200s
  - spark.dynamicAllocation.shuffleTracking.enabled: true
  - spark.dynamicAllocation.shuffleTracking.timeout: 7200s

YARN sees the executors as allocated, autoscaler does not trigger
scale-down within this step. Lighter downstream steps keep the defaults
so the cluster shrinks between them, preserving the per-step autoscale
behaviour wanted for the rest of the literature pipeline.

This is a per-step Spark fix rather than a cluster-policy change. The
cluster autoscaling policy (gracefulDecommissionTimeout=600s,
scaleDownFactor=1.0) remains unchanged and continues to govern
between-step transitions and the lighter downstream steps.
…hare maybe_coalesce helper

Cooccurrence_evidence exhibits the same small-file pattern that motivated
the publication_match coalesce work. At run-016 scale:

  cooccurrence: 9.82 GiB / 3752 files = 2.7 MB / file
  evidence    : 7.84 GiB /  360 files = 22 MB / file

Same root cause: high spark.sql.shuffle.partitions to handle the heavy
upstream join, no rebalance before the write, AQE coalesce doesn't reach
the write boundary.

Adds two settings to literature_cooccurrence_evidence mirroring the
match_valid_coalesce / match_failed_coalesce pattern:

  cooccurrence_coalesce: target partition count for the cooccurrence write
  evidence_coalesce: target partition count for the evidence write

launch_literature.sh defaults (~250 MB per parquet file):

  COALESCE_COOCCURRENCE=40   # 9.82 GB / 40 ~= 245 MB / file
  COALESCE_EVIDENCE=32       # 7.84 GB / 32 ~= 245 MB / file

The helper is now in pts/pyspark/common/utils.py as maybe_coalesce (no
underscore; same convention as safe_array_union, parse_spark_schema, etc).
literature_publication_match drops its local copy and imports from the
common module. The local _maybe_repartition in publication_match stays
where it is for now since it's still single-use.
Run-016 publication_match recorded 438 FetchFailed events across 3 stage
244 retries (244.0 -> 244.1 -> 244.2). Source hosts: sw-5tcs (104),
sw-51fp (32), sw-pj0v (8). All "Connection from sw-XXXX:7337 closed" -- a
transient External Shuffle Service drop, not executor or node death (no
ExecutorLost or decommission events in the driver log). The dynamic
allocation pin from a591223 holds unhealthy hosts in the pool instead
of letting Dataproc replace them, so when one ESS gets wobbly we keep
hitting it.

With Spark defaults (shuffle.io.maxRetries=3, retryWait=5s, network
timeout=120s) a 30s ESS blip blew through retries and triggered stage
recomputes, each of which had to re-run upstream map outputs. Plausibly
2-4 min of the +6 min wall-clock vs run-014.

  spark.shuffle.io.maxRetries: 3 -> 10
  spark.shuffle.io.retryWait: 5s -> 15s
  spark.network.timeout: 120s -> 300s
  spark.reducer.maxBlocksInFlightPerAddress: unbounded -> 128

10 x 15s = 150s of silent retry per fetch absorbs the transient drops we
observed. Capping in-flight blocks per source avoids overwhelming a
wobbly ESS with concurrent requests.

Refactored the cluster --properties argument to a bash array joined with
the existing `^#^` delimiter so future additions stay readable.

This is the priority #4 from the run-013 audit that we deferred at 5
events; at 438 it pays for itself.
…modules

Hand-off cleanup ahead of orchestration takeover. Removes the local
Dataproc launcher and its planning docs (scripts/literature/,
docs/superpowers/), the pre-collapse PySpark step modules superseded
by literature_publication_match.py / literature_cooccurrence_evidence.py,
and the stale step keys in config.yaml that referenced them.

Kept:
  - src/pts/pyspark/literature_publication_match.py (collapsed step)
  - src/pts/pyspark/literature_cooccurrence_evidence.py (collapsed step)
  - src/pts/pyspark/literature_entity_lut.py
  - src/pts/pyspark/literature_embedding.py
  - src/pts/pyspark/literature_vector.py
  - src/pts/pyspark/literature_ontoma_lut_generation.py
  - src/pts/pyspark/common/utils.py (shared maybe_coalesce helper)
  - test/test_literature_*.py (cover the collapsed step code paths)
  - pyproject.toml temp pins for OnToma PR #51 and ot-literature PR #10
    (to be reverted to released versions once those PRs ship)

Production runs are driven by the orchestration repo from now on; the
launcher served as an iteration tool while we tuned the literature
pipeline at full-EPMC scale (runs 001-016).
Lifts the EPMC-only `_maybe_repartition` helper out of
literature_publication_match.py into pts/pyspark/common/utils.py as the
public `maybe_repartition`, mirroring the earlier `maybe_coalesce` move.
The helper is shape-identical to `maybe_coalesce` modulo the
narrow-vs-wide transformation difference; co-locating both makes the
write/read boundary helpers discoverable from a single module.

Call site in literature_publication_match.py imports from the new
location. Tests in test_literature_publication_match.py update their
import path; the test class stays put for now (can be migrated to
test_common_utils.py in a follow-up if desired).
Moves the `TestMaybeRepartition` class out of
test_literature_publication_match.py (where it lived for historical
reasons before the helper was lifted into common.utils) and into
test_common_utils.py, where it sits next to the helper it covers.

Adds a parallel set of three tests for `maybe_coalesce`, which was
previously untested. Converts both helpers' tests from class-based to
module-level functions to match the existing style of
test_common_utils.py.

Net test count goes from 330 to 333 (3 new maybe_coalesce tests; the
3 maybe_repartition tests moved without changing count).
Comment thread test/test_literature_entity_lut.py Outdated
d0choa added 4 commits May 22, 2026 11:21
The _MATCH_SCHEMA fixture named the entity label column `label`, but
the real match_mapped dataset's normalised label column is
`entityLabelNormalised` (the raw `label` is a separate column). The
column is a passthrough not consumed by _compute_relevance, so this is
a fixture-fidelity fix with no behaviour change.

Addresses review feedback from @vivienho on PR #113.
OnToma 2.4.1 is now released with the large-scale entity-mapping perf
fix (previously consumed via the perf/avoid-distinct-collect-in-mapping-compatibility
branch override). Bumps the pin from ==2.4.0 to >=2.4.1 and drops the
temporary git source so the dependency resolves from PyPI.

The ot-literature git override stays until its PR ships.
Brings config.yaml's literature step settings/properties in line with
the orchestration runner (pts.yaml), so a standalone PTS run matches
production tuning validated on runs 014-016:

  - literature_publication_match: add settings (date_prefix, repartition,
    match_valid_coalesce, match_failed_coalesce) + per-step Spark props
    (shuffle.partitions, AQE skew/coalesce, files.maxPartitionBytes,
    dynamicAllocation pin).
  - literature_entity_lut: add shuffle.partitions=10000.
  - literature_embedding: shuffle.partitions 800 -> 15000.
  - literature_cooccurrence_evidence: add settings (cooccurrence_coalesce,
    evidence_coalesce) + AQE skew props.

Only the per-step layer is propagated; cluster-wide properties stay in
the launcher/cluster definition, not config.yaml.
…re step

literature_cooccurrence_evidence now produces intermediate/evidence/literature_epmc
itself (it generates cooccurrences and computes evidence in one step,
reusing evidence_epmc._compute_evidence internally). The standalone
evidence_epmc sub-task in the literature: step is therefore redundant.

It was also dead: the sub-task feeds the cooccurrence parquet straight
into evidence_epmc._compute_evidence without the column adapter the
collapsed step applies, so it would fail on the Cooccurrence schema
(mappedId1/2, evidenceScore) which lacks the keywordId1/2 /
evidence_score columns _compute_evidence expects. Its old compatible
producer (literature_cooccurrence.py) was already removed.

The evidence_epmc.py module stays — literature_cooccurrence_evidence
imports _compute_evidence from it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants