Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ The system uses weighted reward components:
- `--price-weight` (default 0.2): Weight for electricity price optimization
- `--idle-weight` (default 0.1): Penalty weight for idle nodes
- `--job-age-weight` (default 0.0): Penalty weight for job waiting time
- `--drop-weight` (default 0.0): Penalty weight for dropped jobs (exceeding MAX_JOB_AGE)
- `--drop-weight` (default 0.0): Penalty weight for lost jobs (age expiry or queue-full rejection)

**Workload generator options** (pass `--workload-gen` to enable; replaces historical log samplers):
- `--workload-gen`: Arrival mode — `flat`, `poisson`, or `uniform` (default: disabled)
Expand Down Expand Up @@ -299,4 +299,4 @@ Use `--session` parameter to create named training runs for organization and com
The cumulative savings plot (generated during `--evaluate-savings`) is saved to the session's plots directory and shows:
- Agent costs vs baseline costs over time
- Two baseline comparisons: with idle nodes (baseline) and without idle nodes (baseline_off)
- Visual representation of cost reduction achieved by the trained agent
- Visual representation of cost reduction achieved by the trained agent
25 changes: 17 additions & 8 deletions src/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
assign_jobs_to_available_nodes,
fill_queue_from_backlog,
age_backlog_queue,
age_job_queue,
)
from src.metrics_tracker import MetricsTracker
from src.reward_calculation import power_consumption_mwh
Expand Down Expand Up @@ -69,11 +70,12 @@ def baseline_step(
is_baseline=True,
)

# Age jobs already waiting before admitting this step's new arrivals.
baseline_next_empty_slot, queue_aged_dropped = age_job_queue(job_queue_2d, baseline_next_empty_slot)

# Age helper queue and fill real queue before new arrivals
age_backlog_queue(baseline_backlog_queue, metrics, _is_baseline=True)
baseline_next_empty_slot, _ = fill_queue_from_backlog(
job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot
)
backlog_aged_dropped = age_backlog_queue(baseline_backlog_queue, metrics, _is_baseline=True)
baseline_next_empty_slot, _ = fill_queue_from_backlog(job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot)

_new_baseline_jobs, baseline_next_empty_slot, baseline_backlog_dropped = add_new_jobs(
job_queue_2d, new_jobs_count, new_jobs_durations,
Expand All @@ -82,23 +84,30 @@ def baseline_step(
metrics.baseline_jobs_submitted += new_jobs_count
metrics.episode_baseline_jobs_submitted += new_jobs_count
if baseline_backlog_dropped > 0:
metrics.baseline_jobs_dropped += baseline_backlog_dropped
metrics.episode_baseline_jobs_dropped += baseline_backlog_dropped
metrics.baseline_jobs_rejected_queue_full += baseline_backlog_dropped
metrics.episode_baseline_jobs_rejected_queue_full += baseline_backlog_dropped

num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
total_dropped = queue_aged_dropped + backlog_aged_dropped + baseline_backlog_dropped
num_launched, baseline_next_empty_slot, queue_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, baseline_state['nodes'], baseline_cores_available,
baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True
)
total_dropped += queue_dropped

# Greedy loop: keep refilling from backlog and assigning until no more progress
while len(baseline_backlog_queue) > 0 and num_launched > 0:
baseline_next_empty_slot, moved = fill_queue_from_backlog(job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot)
if moved == 0:
break
num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
num_launched, baseline_next_empty_slot, queue_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, baseline_state['nodes'], baseline_cores_available,
baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True
)
total_dropped += queue_dropped

if total_dropped > 0:
metrics.baseline_jobs_dropped += total_dropped
metrics.episode_baseline_jobs_dropped += total_dropped

num_used_nodes = np.sum(baseline_state['nodes'] > 0)
num_on_nodes = np.sum(baseline_state['nodes'] > -1)
Expand Down
6 changes: 6 additions & 0 deletions src/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def _on_step(self) -> bool:
self.logger.record("metrics/max_queue_size", env.metrics.episode_max_queue_size_reached)
self.logger.record("metrics/max_backlog_size", env.metrics.episode_max_backlog_size_reached)
self.logger.record("metrics/jobs_dropped", env.metrics.episode_jobs_dropped)
self.logger.record("metrics/jobs_lost_total", env.metrics.episode_jobs_dropped)
loss_rate = (env.metrics.episode_jobs_dropped / env.metrics.episode_jobs_submitted * 100 if env.metrics.episode_jobs_submitted > 0 else 0.0)
self.logger.record("metrics/loss_rate", loss_rate)
self.logger.record("metrics/jobs_rejected_queue_full", env.metrics.episode_jobs_rejected_queue_full)

# Job metrics (baseline)
Expand All @@ -56,6 +59,9 @@ def _on_step(self) -> bool:
self.logger.record("metrics/baseline_max_queue_size", env.metrics.episode_baseline_max_queue_size_reached)
self.logger.record("metrics/baseline_max_backlog_size", env.metrics.episode_baseline_max_backlog_size_reached)
self.logger.record("metrics/baseline_jobs_dropped", env.metrics.episode_baseline_jobs_dropped)
self.logger.record("metrics/baseline_jobs_lost_total", env.metrics.episode_baseline_jobs_dropped)
baseline_loss_rate = (env.metrics.episode_baseline_jobs_dropped / env.metrics.episode_baseline_jobs_submitted * 100 if env.metrics.episode_baseline_jobs_submitted > 0 else 0.0)
self.logger.record("metrics/baseline_loss_rate", baseline_loss_rate)
self.logger.record("metrics/baseline_jobs_rejected_queue_full", env.metrics.episode_baseline_jobs_rejected_queue_full)

# Proportional (per-core) power metrics
Expand Down
6 changes: 3 additions & 3 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
MAX_BACKLOG_SIZE: int = 50000 # Maximum number of jobs in the backlog (overflow) queue
MAX_CHANGE: int = MAX_NODES
MAX_JOB_DURATION: int = 170 # maximum job runtime in hours
# Use a very high cap; age-based dropping is temporarily disabled in code.
# MAX_JOB_AGE = WEEK_HOURS * 52 * 10 # ~10 years in hours
# Drop jobs that have waited longer than this in queue/backlog.
MAX_JOB_AGE = WEEK_HOURS * 2
MAX_NEW_JOBS_PER_HOUR: int = 1500

COST_IDLE: int = 150 # Watts
Expand All @@ -26,7 +26,7 @@
EPISODE_HOURS: int = WEEK_HOURS * 2
MAX_JOB_AGE_OBS: int = EPISODE_HOURS * 13 # maximum job age observable in the state, here set to ~6 Months

PENALTY_DROPPED_JOB: float = -5.0 # explicit penalty for each job dropped due to exceeding MAX_JOB_AGE
PENALTY_DROPPED_JOB: float = -5.0 # legacy per-job lost-job penalty constant

# Reward/penalty constants
PENALTY_IDLE_NODE: float = -0.1 # Penalty for idling nodes
Expand Down
19 changes: 14 additions & 5 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
)
from src.job_management import (
process_ongoing_jobs, add_new_jobs,
assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue
assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue,
age_job_queue,
)
from src.node_management import adjust_nodes
from src.reward_calculation import RewardCalculator, power_consumption_mwh
Expand Down Expand Up @@ -331,6 +332,11 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
)
self.env_print(f"{len(completed_jobs)} jobs completed: [{' '.join(['#' + str(job_id) for job_id in completed_jobs]) if len(completed_jobs) > 0 else ''}]")

# Age jobs already waiting before admitting this step's new arrivals.
self.next_empty_slot, queue_aged_dropped = age_job_queue(job_queue_2d, self.next_empty_slot)
if queue_aged_dropped > 0:
queue_backlog_mutated = True

# Age helper queues (jobs waiting outside the fixed queue)
backlog_aged_dropped = age_backlog_queue(self.backlog_queue, self.metrics, _is_baseline=False)
if backlog_aged_dropped > 0:
Expand Down Expand Up @@ -362,8 +368,8 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
if len(new_jobs) > 0:
queue_backlog_mutated = True
if backlog_dropped > 0:
self.metrics.jobs_dropped += backlog_dropped
self.metrics.episode_jobs_dropped += backlog_dropped
self.metrics.jobs_rejected_queue_full += backlog_dropped
self.metrics.episode_jobs_rejected_queue_full += backlog_dropped
Comment on lines +371 to +372
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Queue-full rejections are now feeding the reward-side drop counter.

Lines 365-366 already track backlog_dropped separately as jobs_rejected_queue_full, but Line 385 also folds it into num_dropped_this_step, and Lines 504-505 then roll it into jobs_dropped. Because RewardCalculator.calculate() consumes num_dropped_this_step, queue-capacity rejections now get the same penalty as MAX_JOB_AGE expirations. If that split is intentional, rename the metric/penalty to “total rejections”; otherwise keep backlog_dropped out of the reward-facing counter.

Possible split between penalized drops and queue-full rejections
-        num_dropped_this_step = backlog_aged_dropped + backlog_dropped
+        num_dropped_this_step = backlog_aged_dropped
+        num_rejected_queue_full_this_step = backlog_dropped
@@
-        self.metrics.jobs_dropped += num_dropped_this_step
-        self.metrics.episode_jobs_dropped += num_dropped_this_step
+        self.metrics.jobs_dropped += num_dropped_this_step + num_rejected_queue_full_this_step
+        self.metrics.episode_jobs_dropped += num_dropped_this_step + num_rejected_queue_full_this_step

Also applies to: 385-385, 504-505

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/environment.py` around lines 365 - 366, The code currently folds
queue-full rejections (backlog_dropped) into the reward-facing counters
(num_dropped_this_step and ultimately jobs_dropped) causing
RewardCalculator.calculate to penalize them like MAX_JOB_AGE expirations;
instead, keep backlog_dropped as a separate metric-only drop: stop adding
backlog_dropped into num_dropped_this_step and into the aggregate jobs_dropped
rollup, leaving increments to metrics.jobs_rejected_queue_full and
metrics.episode_jobs_rejected_queue_full only; if you intended both to be
penalized, alternatively rename those metrics and the reward variable to
"total_rejections" and update RewardCalculator.calculate to reference the
renamed total so intent is clear.

self.metrics.jobs_submitted += new_jobs_count
self.metrics.episode_jobs_submitted += new_jobs_count

Expand All @@ -382,7 +388,7 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
# Assign jobs to available nodes
self.env_print(f"[4] Assigning jobs to available nodes...")

num_dropped_this_step = backlog_dropped
num_dropped_this_step = queue_aged_dropped + backlog_aged_dropped + backlog_dropped
num_launched_jobs, self.next_empty_slot, queue_dropped, self.next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False
Expand Down Expand Up @@ -501,7 +507,9 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
self.metrics.episode_job_age_penalties.append(job_age_penalty_norm * 100)
self.metrics.episode_idle_penalties.append(idle_penalty_norm * 100)
self.metrics.episode_rewards.append(step_reward)

self.metrics.jobs_dropped += num_dropped_this_step
self.metrics.episode_jobs_dropped += num_dropped_this_step

# print stats
self.env_print(f"[6] End of step stats...")
self.env_print("job queue: ", ' '.join(['[{} {} {} {}]'.format(d, a, n, c) for d, a, n, c in job_queue_2d if d > 0]))
Expand Down Expand Up @@ -549,6 +557,7 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
"num_unprocessed_jobs": num_unprocessed_jobs,
"num_on_nodes": num_on_nodes,
"episode_jobs_dropped": self.metrics.episode_jobs_dropped,
"episode_jobs_lost_total": self.metrics.episode_jobs_dropped,
}

return self.state, step_reward, terminated, truncated, info
7 changes: 5 additions & 2 deletions src/evaluation_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def build_episode_summary_line(
f"({float(episode_data['completion_rate']):.0f}%), "
f"AvgWait={float(episode_data['avg_wait_time']):.1f}h, "
f"EpisodeMaxQueue={int(episode_data['max_queue_size'])}, "
f"Dropped={int(episode_data['jobs_dropped'])}, "
f"Lost={int(episode_data.get('jobs_lost_total', episode_data['jobs_dropped']))}, "
f"TimelineMaxQueue={timeline_max_queue}, "
f"Agent Occupancy (Cores)={agent_occupancy_cores_pct:.2f}%, "
f"Baseline Occupancy (Cores)={baseline_occupancy_cores_pct:.2f}%, "
Expand All @@ -67,5 +67,8 @@ def build_episode_summary_line(
f"PropPower={float(episode_data['agent_prop_power_mwh']):.1f}/"
f"{float(episode_data['baseline_prop_power_mwh']):.1f}/"
f"{float(episode_data['baseline_off_prop_power_mwh']):.1f} MWh "
f"(agent/base/base_off)"
f"(agent/base/base_off), "
f"PropSavings=€{float(episode_data['savings_prop_cost_vs_baseline']):.0f}/"
f"€{float(episode_data['savings_prop_cost_vs_baseline_off']):.0f} "
f"(vs base/base_off)"
)
77 changes: 35 additions & 42 deletions src/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@

import numpy as np
from src.config import (
MAX_NODES, CORES_PER_NODE, MAX_BACKLOG_SIZE
MAX_NODES, CORES_PER_NODE, MAX_BACKLOG_SIZE, MAX_JOB_AGE
)
from src.metrics_tracker import MetricsTracker


def age_backlog_queue(backlog_queue: deque, _metrics: MetricsTracker, _is_baseline: bool = False) -> int:
"""
Age jobs waiting in the backlog queue.
NOTE: dropping based on MAX_JOB_AGE is temporarily disabled via an `if False` hotfix,
so jobs are always kept even if job[1] > MAX_JOB_AGE.
TODO: re-enable drops by removing the `if False` guard and using `job[1] > MAX_JOB_AGE`;
metrics updates are already in the disabled branch.

Returns the number of jobs dropped for exceeding ``MAX_JOB_AGE``.
Callers are responsible for updating any drop counters exactly once.
"""
if not backlog_queue:
return 0
Expand All @@ -25,25 +24,40 @@ def age_backlog_queue(backlog_queue: deque, _metrics: MetricsTracker, _is_baseli
kept = []
for job in backlog_queue:
job[1] += 1
# TEMP HOTFIX: disable age-based dropping (keep logic for later).
# if False and job[1] > MAX_JOB_AGE:
# dropped += 1
# if _is_baseline:
# _metrics.baseline_jobs_dropped += 1
# _metrics.baseline_dropped_this_episode += 1
# _metrics.episode_baseline_jobs_dropped += 1
# else:
# _metrics.jobs_dropped += 1
# _metrics.dropped_this_episode += 1
# _metrics.episode_jobs_dropped += 1
# else:
kept.append(job)
if job[1] > MAX_JOB_AGE:
dropped += 1
else:
kept.append(job)

backlog_queue.clear()
backlog_queue.extend(kept)
return dropped


def age_job_queue(job_queue_2d: np.ndarray, next_empty_slot: int) -> tuple[int, int]:
"""
Age jobs already waiting in the main queue once for the current step.

Returns the updated ``next_empty_slot`` together with the number of jobs
dropped for exceeding ``MAX_JOB_AGE``.
"""
dropped = 0

for job_idx, job in enumerate(job_queue_2d):
job_duration = job[0]
if job_duration <= 0:
continue

job_queue_2d[job_idx][1] += 1
if job_queue_2d[job_idx][1] > MAX_JOB_AGE:
job_queue_2d[job_idx] = [0, 0, 0, 0]
if job_idx < next_empty_slot:
next_empty_slot = job_idx
dropped += 1

return next_empty_slot, dropped


def fill_queue_from_backlog(job_queue_2d: np.ndarray, backlog_queue: deque, next_empty_slot: int) -> tuple[int, int]:
"""
Move jobs from backlog queue into the real queue (FIFO) until full.
Expand Down Expand Up @@ -221,11 +235,12 @@ def assign_jobs_to_available_nodes(
running_jobs: Dictionary of currently running jobs
next_empty_slot: Index of next empty slot in queue
next_job_id: Next available job ID
metrics: MetricsTracker object to update with job completion metrics
metrics: MetricsTracker object to update with completion/wait-time metrics
is_baseline: Whether this is baseline simulation

Returns:
Tuple of (num_processed_jobs, updated next_empty_slot, num_dropped, updated next_job_id)
Tuple of (num_processed_jobs, updated next_empty_slot, num_dropped, updated next_job_id).
Callers are responsible for recording ``num_dropped`` exactly once.
"""
num_processed_jobs = 0
num_dropped = 0
Expand Down Expand Up @@ -266,26 +281,4 @@ def assign_jobs_to_available_nodes(
num_processed_jobs += 1
continue

# Not enough resources -> job waits and ages (or gets dropped)
new_age = job_age + 1

# TEMP HOTFIX: disable age-based dropping (keep logic for later).
#if False and new_age > MAX_JOB_AGE:
# # Clear job from queue
# job_queue_2d[job_idx] = [0, 0, 0, 0]
#
# # Update next_empty_slot if we cleared a slot before it
# if job_idx < next_empty_slot:
# next_empty_slot = job_idx
# num_dropped += 1
#
# if is_baseline:
# metrics.baseline_jobs_dropped += 1
# metrics.episode_baseline_jobs_dropped += 1
# else:
# metrics.jobs_dropped += 1
# metrics.episode_jobs_dropped += 1
#else:
job_queue_2d[job_idx][1] = new_age

return num_processed_jobs, next_empty_slot, num_dropped, next_job_id
17 changes: 16 additions & 1 deletion src/metrics_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i
if self.episode_baseline_jobs_submitted
else 0.0
)
loss_rate = drop_rate
baseline_loss_rate = baseline_drop_rate
agent_mean_price: float = self._effective_mean_price(
self.episode_total_cost, self.episode_total_power_consumption_mwh
)
Expand Down Expand Up @@ -200,10 +202,15 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i
(COST_IDLE_MW * on + _compute_delta_mw * (cores / CORES_PER_NODE)) * price
for on, cores, price in zip(self.episode_on_nodes, self.episode_used_cores, self.episode_price_stats)
)
baseline_prop_cost: float = sum(
(COST_IDLE_MW * MAX_NODES + _compute_delta_mw * (cores / CORES_PER_NODE)) * price
for cores, price in zip(self.episode_baseline_used_cores, self.episode_price_stats)
)
baseline_off_prop_cost: float = sum(
(COST_IDLE_MW * used + _compute_delta_mw * (cores / CORES_PER_NODE)) * price
for used, cores, price in zip(self.episode_baseline_used_nodes, self.episode_baseline_used_cores, self.episode_price_stats)
)
savings_prop_cost_vs_baseline: float = baseline_prop_cost - agent_prop_cost
savings_prop_cost_vs_baseline_off: float = baseline_off_prop_cost - agent_prop_cost
dropped_jobs_per_saved_euro: float = self._safe_ratio(
float(self.episode_jobs_dropped), savings_vs_baseline
Expand All @@ -223,6 +230,10 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i
'agent_prop_power_mwh': agent_prop_power_mwh,
'baseline_prop_power_mwh': baseline_prop_power_mwh,
'baseline_off_prop_power_mwh': baseline_off_prop_power_mwh,
'agent_prop_cost': agent_prop_cost,
'baseline_prop_cost': baseline_prop_cost,
'baseline_off_prop_cost': baseline_off_prop_cost,
'savings_prop_cost_vs_baseline': savings_prop_cost_vs_baseline,
'savings_prop_cost_vs_baseline_off': savings_prop_cost_vs_baseline_off,
'agent_mean_price': agent_mean_price,
'baseline_mean_price': baseline_mean_price,
Expand Down Expand Up @@ -251,12 +262,16 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i
'baseline_completion_rate': baseline_completion_rate,
'baseline_max_queue_size': self.episode_baseline_max_queue_size_reached,
'baseline_max_backlog_size': self.episode_baseline_max_backlog_size_reached,
# Drop metrics
# Loss metrics: includes age expirations and queue-full rejections.
"jobs_dropped": self.episode_jobs_dropped,
"jobs_lost_total": self.episode_jobs_dropped,
"drop_rate": drop_rate,
"loss_rate": loss_rate,
"jobs_rejected_queue_full": self.episode_jobs_rejected_queue_full,
"baseline_jobs_dropped": self.episode_baseline_jobs_dropped,
"baseline_jobs_lost_total": self.episode_baseline_jobs_dropped,
"baseline_drop_rate": baseline_drop_rate,
"baseline_loss_rate": baseline_loss_rate,
"baseline_jobs_rejected_queue_full": self.episode_baseline_jobs_rejected_queue_full,
}
self.episode_costs.append(episode_data)
Expand Down
Loading
Loading