-
Notifications
You must be signed in to change notification settings - Fork 1
Explore per-core power calculation #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,27 +73,36 @@ def validate_next_empty(job_queue_2d: np.ndarray, next_empty: int) -> None: | |
| assert np.all(job_queue_2d[:next_empty, 0] != 0), "hole before next_empty_slot" | ||
|
|
||
|
|
||
| def process_ongoing_jobs(nodes: np.ndarray, cores_available: np.ndarray, running_jobs: dict[int, dict[str, Any]]) -> list[int]: | ||
| def process_ongoing_jobs(nodes: np.ndarray, cores_available: np.ndarray, running_jobs: dict[int, dict[str, Any]], metrics: MetricsTracker, is_baseline: bool) -> list[int]: | ||
| """ | ||
| Process ongoing jobs: decrement their duration, complete finished jobs, | ||
| and release resources. | ||
| release resources, and record completion metrics. | ||
|
|
||
| Completion is counted here (when duration hits zero), not at launch time. | ||
| 'wait_time' on each job is the time spent in the queue before being launched, | ||
| which is the standard HPC metric for scheduler responsiveness. | ||
|
Comment on lines
+81
to
+83
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Completion-time episode counters now mix different job cohorts.
Also applies to: 114-125 🤖 Prompt for AI Agents |
||
|
|
||
| Args: | ||
| nodes: Array of node states | ||
| cores_available: Array of available cores per node | ||
| running_jobs: Dictionary of currently running jobs | ||
| metrics: Optional metrics tracker; when provided, completed job counts | ||
| and queue wait times are recorded here | ||
| is_baseline: Whether finished jobs belong to the baseline simulation | ||
|
|
||
| Returns: | ||
| List of completed job IDs | ||
| """ | ||
| completed_jobs = [] | ||
| completed_wait_time = 0 | ||
|
|
||
| for job_id, job_data in running_jobs.items(): | ||
| job_data['duration'] -= 1 | ||
|
|
||
| # Check if job is completed | ||
| if job_data['duration'] <= 0: | ||
| completed_jobs.append(job_id) | ||
| completed_wait_time += int(job_data.get('wait_time', 0)) | ||
| # Release resources | ||
| for node_idx, cores_used in job_data['allocation']: | ||
| cores_available[node_idx] += cores_used | ||
|
|
@@ -102,6 +111,19 @@ def process_ongoing_jobs(nodes: np.ndarray, cores_available: np.ndarray, running | |
| for job_id in completed_jobs: | ||
| del running_jobs[job_id] | ||
|
|
||
| if completed_jobs: | ||
| completed_count = len(completed_jobs) | ||
| if is_baseline: | ||
| metrics.baseline_jobs_completed += completed_count | ||
| metrics.baseline_total_job_wait_time += completed_wait_time | ||
| metrics.episode_baseline_jobs_completed += completed_count | ||
| metrics.episode_baseline_total_job_wait_time += completed_wait_time | ||
| else: | ||
| metrics.jobs_completed += completed_count | ||
| metrics.total_job_wait_time += completed_wait_time | ||
| metrics.episode_jobs_completed += completed_count | ||
| metrics.episode_total_job_wait_time += completed_wait_time | ||
|
|
||
| # Update node times based on remaining jobs | ||
| # Reset all nodes first | ||
| for i in range(MAX_NODES): | ||
|
|
@@ -198,7 +220,7 @@ def assign_jobs_to_available_nodes( | |
| running_jobs: Dictionary of currently running jobs | ||
| next_empty_slot: Index of next empty slot in queue | ||
| next_job_id: Next available job ID | ||
| metrics: MetricsTracker object to update with job completion metrics | ||
| metrics: MetricsTracker object to update with drop/rejection counts | ||
| is_baseline: Whether this is baseline simulation | ||
|
|
||
| Returns: | ||
|
|
@@ -229,6 +251,7 @@ def assign_jobs_to_available_nodes( | |
| running_jobs[next_job_id] = { | ||
| "duration": job_duration, | ||
| "allocation": job_allocation, | ||
| "wait_time": int(job_age), # hours spent in queue; recorded at completion time | ||
| } | ||
| next_job_id += 1 | ||
|
|
||
|
|
@@ -239,18 +262,6 @@ def assign_jobs_to_available_nodes( | |
| if job_idx < next_empty_slot: | ||
| next_empty_slot = job_idx | ||
|
|
||
| # Track job completion and wait time | ||
| if is_baseline: | ||
| metrics.baseline_jobs_completed += 1 | ||
| metrics.baseline_total_job_wait_time += job_age | ||
| metrics.episode_baseline_jobs_completed += 1 | ||
| metrics.episode_baseline_total_job_wait_time += job_age | ||
| else: | ||
| metrics.jobs_completed += 1 | ||
| metrics.total_job_wait_time += job_age | ||
| metrics.episode_jobs_completed += 1 | ||
| metrics.episode_total_job_wait_time += job_age | ||
|
|
||
| num_processed_jobs += 1 | ||
| continue | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -206,6 +206,8 @@ def parse_args(): | |||||||||||||||||||||
| p.add_argument("--steps", type=int, default=500) | ||||||||||||||||||||||
| p.add_argument("--episodes", type=int, default=1) | ||||||||||||||||||||||
| p.add_argument("--check-determinism", action="store_true") | ||||||||||||||||||||||
| p.add_argument("--check-gym", action="store_true", help="Run stable-baselines3 check_env() on the environment.") | ||||||||||||||||||||||
| p.add_argument("--carry-over-state", action="store_true", help="Run carry-over continuity test (state preserved across episodes).") | ||||||||||||||||||||||
| # mirror train.py-ish knobs (mostly optional) | ||||||||||||||||||||||
| p.add_argument("--session", default="sanity") | ||||||||||||||||||||||
| p.add_argument("--render", type=str, default="none", choices=["human", "none"]) | ||||||||||||||||||||||
|
|
@@ -380,9 +382,16 @@ def cmp(name, a, b): | |||||||||||||||||||||
| determinism_test(lambda: make_env_with_carry(), seed=args.seed, n_steps=min(args.steps, 500)) | ||||||||||||||||||||||
| print("[OK] determinism test passed") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| # 4) Carry-over continuity | ||||||||||||||||||||||
| carry_over_test(lambda: make_env_with_carry(), seed=args.seed, n_steps=min(args.steps, 10)) | ||||||||||||||||||||||
| print("[OK] carry-over continuity test passed") | ||||||||||||||||||||||
| # 4) Gym interface check (optional) | ||||||||||||||||||||||
| if args.check_gym: | ||||||||||||||||||||||
| from stable_baselines3.common.env_checker import check_env | ||||||||||||||||||||||
| check_env(make_env_with_carry(), warn=True) | ||||||||||||||||||||||
| print("[OK] gym check passed") | ||||||||||||||||||||||
|
Comment on lines
+387
to
+389
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Verify check_env usage and whether close() is paired in the same control block.
rg -n -C4 'if args\.check_gym|check_env\(|\.close\(' test/test_sanity_env.pyRepository: FairRootGroup/powersched Length of output: 2611 🏁 Script executed: #!/bin/bash
# Search for other usages of check_env in the repository
rg -n 'check_env' --type pyRepository: FairRootGroup/powersched Length of output: 510 🌐 Web query:
💡 Result: Yes, the stable-baselines3 check_env function calls env.close on the environment during execution, specifically as part of the render check when skip_render_check=False (default is True, so it skips by default). If env.render_mode is set, it calls env.render followed by env.close. For environments wrapped in vectorized environments like DummyVecEnv (used internally in _check_nan), the VecEnv.close method is not explicitly called by check_env, but VecEnvs have a close method for cleanup that closes underlying envs (e.g., SubprocVecEnv sends "close" command and joins processes). check_env does not perform full environment cleanup beyond potential render-related close; it leaves the environment in a stepped state after calling reset and step for validation. Users should handle further cleanup if needed after calling check_env(env). To avoid render close: pass skip_render_check=True (default). Official source code confirms: in check_env, if not skipping render check: _check_render(env), which does env.render; env.close if render_mode present. Citations:
🏁 Script executed: #!/bin/bash
# Check test_checkenv.py for check_env usage and cleanup
cat -n test/test_checkenv.pyRepository: FairRootGroup/powersched Length of output: 1065 Close the environment instance returned by The environment created at line 388 is not explicitly closed. The ♻️ Proposed fix if args.check_gym:
from stable_baselines3.common.env_checker import check_env
- check_env(make_env_with_carry(), warn=True)
+ gym_env = make_env_with_carry()
+ try:
+ check_env(gym_env, warn=True)
+ finally:
+ gym_env.close()
print("[OK] gym check passed")📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| # 5) Carry-over continuity (optional) | ||||||||||||||||||||||
| if args.carry_over_state: | ||||||||||||||||||||||
| carry_over_test(lambda: make_env_with_carry(), seed=args.seed, n_steps=min(args.steps, 10)) | ||||||||||||||||||||||
| print("[OK] carry-over continuity test passed") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| print("done") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop the second
Power=field.Line 67 duplicates the same metric already emitted at Lines 40-43, so the episode line now contains two
Power=sections. That makes the output noisy and brittle for any downstream log parsing.✂️ Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents