Skip to content

Commit fa21d6b

Browse files
authored
Merge pull request #156 from ihmeuw-msca/feature/CCYELLOW-1927_task_and_template_prefix_split
Splitting task and template prefixes
2 parents 67a3039 + 650922c commit fa21d6b

7 files changed

Lines changed: 201 additions & 70 deletions

File tree

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77

88
## [Unreleased]
99

10+
## [1.2.0] - 2025-04-14
11+
12+
### Added
13+
14+
- Split Jobmon-related argument `task_and_template_prefix` into `task_prefix` and `template_prefix`.
15+
16+
### Changed
17+
18+
- Add 'config' to node args instead of task args for distinct node args across multiple models in a single workflow.
19+
1020
## [1.1.1] - 2025-04-09
1121

1222
### Added

docs/meta.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
versions = [
2-
"1.1.1",
2+
"1.2.0",
33
]

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "onemod"
7-
version = "1.1.1"
7+
version = "1.2.0"
88
description = "An orchestration package for statistical modeling pipelines."
99
readme = "README.md"
1010
requires-python = ">=3.10, <3.13"

src/onemod/backend/jobmon_backend.py

Lines changed: 89 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
# TODO: Could dependencies be method specific?
4343
# TODO: should we check resources format, minimum resources, cluster?
4444

45+
import itertools
4546
import sys
4647
from pathlib import Path
4748
from typing import Any, Literal
@@ -73,7 +74,8 @@ def evaluate_with_jobmon(
7374
subsets: dict[str, Any | list[Any]] | None = None,
7475
paramsets: dict[str, Any | list[Any]] | None = None,
7576
collect: bool | None = None,
76-
task_and_template_prefix: str | None = None,
77+
task_prefix: str | None = None,
78+
template_prefix: str | None = None,
7779
max_attempts: int = 1,
7880
**kwargs,
7981
) -> None:
@@ -117,8 +119,11 @@ def evaluate_with_jobmon(
117119
118120
Jobmon Parameters
119121
-----------------
120-
task_and_template_prefix : str, optional
121-
Optional prefix to append to task/template name. Default is None,
122+
task_prefix : str, optional
123+
Optional prefix to append to task names. Default is None, no
124+
prefix.
125+
template_prefix : str, optional
126+
Optional prefix to append to task template name. Default is None,
122127
no prefix.
123128
max_attempts : int
124129
Maximum number of attempts for a task. Default is 1.
@@ -141,7 +146,8 @@ def evaluate_with_jobmon(
141146
subsets=subsets,
142147
paramsets=paramsets,
143148
collect=collect,
144-
task_and_template_prefix=task_and_template_prefix,
149+
task_prefix=task_prefix,
150+
template_prefix=template_prefix,
145151
max_attempts=max_attempts,
146152
**kwargs,
147153
)
@@ -159,7 +165,8 @@ def add_tasks_to_workflow(
159165
subsets: dict[str, Any | list[Any]] | None = None,
160166
paramsets: dict[str, Any | list[Any]] | None = None,
161167
collect: bool | None = None,
162-
task_and_template_prefix: str | None = None,
168+
task_prefix: str | None = None,
169+
template_prefix: str | None = None,
163170
max_attempts: int = 1,
164171
external_upstream_tasks: list[Task] | None = None,
165172
**kwargs,
@@ -209,8 +216,11 @@ def add_tasks_to_workflow(
209216
210217
Jobmon Parameters
211218
-----------------
212-
task_and_template_prefix : str, optional
213-
Optional prefix to append to task/template name. Default is None,
219+
task_prefix : str, optional
220+
Optional prefix to append to task names. Default is None, no
221+
prefix.
222+
template_prefix : str, optional
223+
Optional prefix to append to task template name. Default is None,
214224
no prefix.
215225
max_attempts : int
216226
Maximum number of attempts for a task. Default is 1.
@@ -236,7 +246,8 @@ def add_tasks_to_workflow(
236246
subsets=subsets,
237247
paramsets=paramsets,
238248
collect=collect,
239-
task_and_template_prefix=task_and_template_prefix,
249+
task_prefix=task_prefix,
250+
template_prefix=template_prefix,
240251
max_attempts=max_attempts,
241252
external_upstream_tasks=external_upstream_tasks,
242253
**kwargs,
@@ -334,7 +345,8 @@ def get_tasks(
334345
subsets: dict[str, Any | list[Any]] | None,
335346
paramsets: dict[str, Any | list[Any]] | None,
336347
collect: bool | None,
337-
task_and_template_prefix: str | None,
348+
task_prefix: str | None,
349+
template_prefix: str | None,
338350
max_attempts: int,
339351
external_upstream_tasks: list[Task] | None = None,
340352
**kwargs,
@@ -376,8 +388,10 @@ def get_tasks(
376388
377389
Jobmon Parameters
378390
-----------------
379-
task_and_template_prefix : str, optional
380-
Optional prefix to append to task/template name.
391+
task_prefix : str, optional
392+
Optional prefix to append to task names.
393+
template_prefix : str, optional
394+
Optional prefix to append to task template name.
381395
max_attempts : int
382396
Maximum number of attempts for a task.
383397
external_upstream_tasks : list, optional
@@ -400,7 +414,8 @@ def get_tasks(
400414
python=python,
401415
stages=stages,
402416
external_upstream_tasks=external_upstream_tasks,
403-
task_and_template_prefix=task_and_template_prefix,
417+
task_prefix=task_prefix,
418+
template_prefix=template_prefix,
404419
max_attempts=max_attempts,
405420
**kwargs,
406421
)
@@ -410,7 +425,8 @@ def get_tasks(
410425
tool=tool,
411426
resources=resources,
412427
python=python,
413-
task_and_template_prefix=task_and_template_prefix,
428+
task_prefix=task_prefix,
429+
template_prefix=template_prefix,
414430
max_attempts=max_attempts,
415431
subsets=subsets,
416432
paramsets=paramsets,
@@ -428,7 +444,8 @@ def get_pipeline_tasks(
428444
python: Path | str,
429445
stages: list[str] | None,
430446
external_upstream_tasks: list[Task] | None,
431-
task_and_template_prefix: str | None,
447+
task_prefix: str | None,
448+
template_prefix: str | None,
432449
max_attempts: int,
433450
**kwargs,
434451
) -> list[Task]:
@@ -452,8 +469,10 @@ def get_pipeline_tasks(
452469
external_upstream_tasks : list, optional
453470
List of Jobmon tasks external to the OneMod Stages or Pipeline that
454471
should be treated as upstream dependencies of the new tasks.
455-
task_and_template_prefix : str, optional
456-
Optional prefix to append to task/template name.
472+
task_prefix : str, optional
473+
Optional prefix to append to task names.
474+
template_prefix : str, optional
475+
Optional prefix to append to task template name.
457476
max_attempts : int
458477
Maximum number of attempts for a task.
459478
**kwargs
@@ -481,7 +500,8 @@ def get_pipeline_tasks(
481500
tool=tool,
482501
resources=resources,
483502
python=python,
484-
task_and_template_prefix=task_and_template_prefix,
503+
task_prefix=task_prefix,
504+
template_prefix=template_prefix,
485505
max_attempts=max_attempts,
486506
upstream_tasks=upstream_tasks,
487507
**kwargs,
@@ -560,7 +580,8 @@ def get_stage_tasks(
560580
tool: Tool,
561581
resources: dict[str, Any],
562582
python: Path | str,
563-
task_and_template_prefix: str | None,
583+
task_prefix: str | None,
584+
template_prefix: str | None,
564585
max_attempts: int,
565586
subsets: dict[str, Any | list[Any]] | None = None,
566587
paramsets: dict[str, Any | list[Any]] | None = None,
@@ -594,8 +615,10 @@ def get_stage_tasks(
594615
True, otherwise default is False.
595616
upstream_tasks : list of Task or None, optional
596617
List of upstream stage tasks. Default is None.
597-
task_and_template_prefix : str, optional
598-
Optional prefix to append to task/template name.
618+
task_prefix : str, optional
619+
Optional prefix to append to task names.
620+
template_prefix : str, optional
621+
Optional prefix to append to task template name.
599622
max_attempts : int
600623
Maximum number of attempts for a task.
601624
**kwargs
@@ -620,26 +643,41 @@ def get_stage_tasks(
620643
tool,
621644
resources,
622645
list(submodel_args.keys()),
623-
task_and_template_prefix=task_and_template_prefix,
646+
template_prefix=template_prefix,
624647
**kwargs,
625648
)
626649

627650
task_name = (
628-
f"{task_and_template_prefix}_{stage.name}_{method}"
629-
if task_and_template_prefix
651+
f"{task_prefix}_{stage.name}_{method}"
652+
if task_prefix
630653
else f"{stage.name}_{method}"
631654
)
632655
if submodel_args:
633-
tasks = task_template.create_tasks(
634-
name=task_name,
635-
upstream_tasks=upstream_tasks,
636-
max_attempts=max_attempts,
637-
entrypoint=entrypoint,
638-
config=config_path,
639-
method=method,
640-
stages=stage.name,
641-
**{**submodel_args, **kwargs},
642-
)
656+
# NOTE: TaskTemplate.create_tasks can only be called once per
657+
# instantiated TaskTemplate, but workflows that contain multiple
658+
# OneMod Pipelines with overlapping TaskTemplates will need to
659+
# add tasks to the same template multiple times. To get around
660+
# this, we need to use TaskTemplate.create_task (not tasks)
661+
# instead. This means we need to generate all combinations of
662+
# submodel args and loop over them in task creation.
663+
submodel_keys, submodel_values = zip(*submodel_args.items())
664+
submodel_arg_combinations = [
665+
dict(zip(submodel_keys, submodel_valueset))
666+
for submodel_valueset in itertools.product(*submodel_values)
667+
]
668+
tasks = [
669+
task_template.create_task(
670+
name=task_name,
671+
upstream_tasks=upstream_tasks,
672+
max_attempts=max_attempts,
673+
entrypoint=entrypoint,
674+
config=config_path,
675+
method=method,
676+
stages=stage.name,
677+
**{**submodel_arg_combination, **kwargs},
678+
)
679+
for submodel_arg_combination in submodel_arg_combinations
680+
]
643681
else:
644682
tasks = [
645683
task_template.create_task(
@@ -662,7 +700,8 @@ def get_stage_tasks(
662700
tool=tool,
663701
resources=resources,
664702
python=python,
665-
task_and_template_prefix=task_and_template_prefix,
703+
task_prefix=task_prefix,
704+
template_prefix=template_prefix,
666705
max_attempts=max_attempts,
667706
upstream_tasks=tasks,
668707
)
@@ -733,7 +772,7 @@ def get_task_template(
733772
tool: Tool,
734773
resources: dict[str, Any],
735774
submodel_args: list[str],
736-
task_and_template_prefix: str | None,
775+
template_prefix: str | None,
737776
**kwargs,
738777
) -> TaskTemplate:
739778
"""Get stage task template.
@@ -753,8 +792,8 @@ def get_task_template(
753792
Dictionary of compute resources.
754793
submodel_args : list of str
755794
List including 'subsets' and/or 'paramsets'.
756-
task_and_template_prefix : str, optional
757-
Optional prefix to append to task/template name.
795+
template_prefix : str, optional
796+
Optional prefix to append to task template name.
758797
**kwargs
759798
Additional keyword arguments passed to stage method.
760799
@@ -765,17 +804,25 @@ def get_task_template(
765804
766805
"""
767806
template_name = (
768-
f"{task_and_template_prefix}_{stage_name}_{method}"
769-
if task_and_template_prefix
807+
f"{template_prefix}_{stage_name}_{method}"
808+
if template_prefix
770809
else f"{stage_name}_{method}"
771810
)
772811

812+
command_template = get_command_template(method, submodel_args, **kwargs)
813+
task_args = ["method", "stages"] + list(kwargs.keys())
814+
# NOTE: Config is a node arg for the purposes of running multiple
815+
# OneMod models in a single workflow; since each model will have
816+
# a separate config, this allows a user to keep the same task
817+
# template name across all models in a given workflow.
818+
node_args = ["config"] + submodel_args
819+
773820
task_template = tool.get_task_template(
774821
template_name=template_name,
775-
command_template=get_command_template(method, submodel_args, **kwargs),
822+
command_template=command_template,
776823
op_args=["entrypoint"],
777-
task_args=["config", "method", "stages"] + list(kwargs.keys()),
778-
node_args=submodel_args,
824+
task_args=task_args,
825+
node_args=node_args,
779826
)
780827

781828
task_resources = get_task_resources(

tests/conftest.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,27 @@ def simple_pipeline(tmp_path_factory):
7373
return pipeline
7474

7575

76+
@pytest.fixture(scope="function")
77+
def second_simple_pipeline(tmp_path_factory):
78+
directory = tmp_path_factory.mktemp("second_jobmon_test_dir")
79+
pipeline = setup_simple_pipeline(directory)
80+
return pipeline
81+
82+
7683
@pytest.fixture(scope="function")
7784
def parallel_pipeline(tmp_path_factory):
7885
directory = tmp_path_factory.mktemp("jobmon_test_dir")
7986
pipeline = setup_parallel_pipeline(directory)
8087
return pipeline
8188

8289

90+
@pytest.fixture(scope="function")
91+
def second_parallel_pipeline(tmp_path_factory):
92+
directory = tmp_path_factory.mktemp("second_jobmon_test_dir")
93+
pipeline = setup_parallel_pipeline(directory)
94+
return pipeline
95+
96+
8397
@pytest.fixture(scope="module")
8498
def resource_dir(tmp_path_factory):
8599
directory = tmp_path_factory.mktemp("resources_test_dir")

0 commit comments

Comments
 (0)