diff --git a/CLAUDE.md b/CLAUDE.md index fb7dda7..a0de158 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -164,7 +164,7 @@ The system uses weighted reward components: - `--price-weight` (default 0.2): Weight for electricity price optimization - `--idle-weight` (default 0.1): Penalty weight for idle nodes - `--job-age-weight` (default 0.0): Penalty weight for job waiting time -- `--drop-weight` (default 0.0): Penalty weight for dropped jobs (exceeding MAX_JOB_AGE) +- `--drop-weight` (default 0.0): Penalty weight for lost jobs (age expiry or queue-full rejection) **Workload generator options** (pass `--workload-gen` to enable; replaces historical log samplers): - `--workload-gen`: Arrival mode — `flat`, `poisson`, or `uniform` (default: disabled) @@ -299,4 +299,4 @@ Use `--session` parameter to create named training runs for organization and com The cumulative savings plot (generated during `--evaluate-savings`) is saved to the session's plots directory and shows: - Agent costs vs baseline costs over time - Two baseline comparisons: with idle nodes (baseline) and without idle nodes (baseline_off) -- Visual representation of cost reduction achieved by the trained agent \ No newline at end of file +- Visual representation of cost reduction achieved by the trained agent diff --git a/src/baseline.py b/src/baseline.py index 191a733..e88d201 100644 --- a/src/baseline.py +++ b/src/baseline.py @@ -10,6 +10,7 @@ assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue, + age_job_queue, ) from src.metrics_tracker import MetricsTracker from src.reward_calculation import power_consumption_mwh @@ -69,11 +70,12 @@ def baseline_step( is_baseline=True, ) + # Age jobs already waiting before admitting this step's new arrivals. + baseline_next_empty_slot, queue_aged_dropped = age_job_queue(job_queue_2d, baseline_next_empty_slot) + # Age helper queue and fill real queue before new arrivals - age_backlog_queue(baseline_backlog_queue, metrics, _is_baseline=True) - baseline_next_empty_slot, _ = fill_queue_from_backlog( - job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot - ) + backlog_aged_dropped = age_backlog_queue(baseline_backlog_queue, metrics, _is_baseline=True) + baseline_next_empty_slot, _ = fill_queue_from_backlog(job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot) _new_baseline_jobs, baseline_next_empty_slot, baseline_backlog_dropped = add_new_jobs( job_queue_2d, new_jobs_count, new_jobs_durations, @@ -82,23 +84,30 @@ def baseline_step( metrics.baseline_jobs_submitted += new_jobs_count metrics.episode_baseline_jobs_submitted += new_jobs_count if baseline_backlog_dropped > 0: - metrics.baseline_jobs_dropped += baseline_backlog_dropped - metrics.episode_baseline_jobs_dropped += baseline_backlog_dropped + metrics.baseline_jobs_rejected_queue_full += baseline_backlog_dropped + metrics.episode_baseline_jobs_rejected_queue_full += baseline_backlog_dropped - num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes( + total_dropped = queue_aged_dropped + backlog_aged_dropped + baseline_backlog_dropped + num_launched, baseline_next_empty_slot, queue_dropped, next_job_id = assign_jobs_to_available_nodes( job_queue_2d, baseline_state['nodes'], baseline_cores_available, baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True ) + total_dropped += queue_dropped # Greedy loop: keep refilling from backlog and assigning until no more progress while len(baseline_backlog_queue) > 0 and num_launched > 0: baseline_next_empty_slot, moved = fill_queue_from_backlog(job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot) if moved == 0: break - num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes( + num_launched, baseline_next_empty_slot, queue_dropped, next_job_id = assign_jobs_to_available_nodes( job_queue_2d, baseline_state['nodes'], baseline_cores_available, baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True ) + total_dropped += queue_dropped + + if total_dropped > 0: + metrics.baseline_jobs_dropped += total_dropped + metrics.episode_baseline_jobs_dropped += total_dropped num_used_nodes = np.sum(baseline_state['nodes'] > 0) num_on_nodes = np.sum(baseline_state['nodes'] > -1) diff --git a/src/callbacks.py b/src/callbacks.py index 2e58a74..75eec06 100644 --- a/src/callbacks.py +++ b/src/callbacks.py @@ -44,6 +44,9 @@ def _on_step(self) -> bool: self.logger.record("metrics/max_queue_size", env.metrics.episode_max_queue_size_reached) self.logger.record("metrics/max_backlog_size", env.metrics.episode_max_backlog_size_reached) self.logger.record("metrics/jobs_dropped", env.metrics.episode_jobs_dropped) + self.logger.record("metrics/jobs_lost_total", env.metrics.episode_jobs_dropped) + loss_rate = (env.metrics.episode_jobs_dropped / env.metrics.episode_jobs_submitted * 100 if env.metrics.episode_jobs_submitted > 0 else 0.0) + self.logger.record("metrics/loss_rate", loss_rate) self.logger.record("metrics/jobs_rejected_queue_full", env.metrics.episode_jobs_rejected_queue_full) # Job metrics (baseline) @@ -56,6 +59,9 @@ def _on_step(self) -> bool: self.logger.record("metrics/baseline_max_queue_size", env.metrics.episode_baseline_max_queue_size_reached) self.logger.record("metrics/baseline_max_backlog_size", env.metrics.episode_baseline_max_backlog_size_reached) self.logger.record("metrics/baseline_jobs_dropped", env.metrics.episode_baseline_jobs_dropped) + self.logger.record("metrics/baseline_jobs_lost_total", env.metrics.episode_baseline_jobs_dropped) + baseline_loss_rate = (env.metrics.episode_baseline_jobs_dropped / env.metrics.episode_baseline_jobs_submitted * 100 if env.metrics.episode_baseline_jobs_submitted > 0 else 0.0) + self.logger.record("metrics/baseline_loss_rate", baseline_loss_rate) self.logger.record("metrics/baseline_jobs_rejected_queue_full", env.metrics.episode_baseline_jobs_rejected_queue_full) # Proportional (per-core) power metrics diff --git a/src/config.py b/src/config.py index 174e6c6..15c8868 100644 --- a/src/config.py +++ b/src/config.py @@ -7,8 +7,8 @@ MAX_BACKLOG_SIZE: int = 50000 # Maximum number of jobs in the backlog (overflow) queue MAX_CHANGE: int = MAX_NODES MAX_JOB_DURATION: int = 170 # maximum job runtime in hours -# Use a very high cap; age-based dropping is temporarily disabled in code. -# MAX_JOB_AGE = WEEK_HOURS * 52 * 10 # ~10 years in hours +# Drop jobs that have waited longer than this in queue/backlog. +MAX_JOB_AGE = WEEK_HOURS * 2 MAX_NEW_JOBS_PER_HOUR: int = 1500 COST_IDLE: int = 150 # Watts @@ -26,7 +26,7 @@ EPISODE_HOURS: int = WEEK_HOURS * 2 MAX_JOB_AGE_OBS: int = EPISODE_HOURS * 13 # maximum job age observable in the state, here set to ~6 Months -PENALTY_DROPPED_JOB: float = -5.0 # explicit penalty for each job dropped due to exceeding MAX_JOB_AGE +PENALTY_DROPPED_JOB: float = -5.0 # legacy per-job lost-job penalty constant # Reward/penalty constants PENALTY_IDLE_NODE: float = -0.1 # Penalty for idling nodes diff --git a/src/environment.py b/src/environment.py index 268d7a0..41f75d0 100644 --- a/src/environment.py +++ b/src/environment.py @@ -24,7 +24,8 @@ ) from src.job_management import ( process_ongoing_jobs, add_new_jobs, - assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue + assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue, + age_job_queue, ) from src.node_management import adjust_nodes from src.reward_calculation import RewardCalculator, power_consumption_mwh @@ -331,6 +332,11 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, ) self.env_print(f"{len(completed_jobs)} jobs completed: [{' '.join(['#' + str(job_id) for job_id in completed_jobs]) if len(completed_jobs) > 0 else ''}]") + # Age jobs already waiting before admitting this step's new arrivals. + self.next_empty_slot, queue_aged_dropped = age_job_queue(job_queue_2d, self.next_empty_slot) + if queue_aged_dropped > 0: + queue_backlog_mutated = True + # Age helper queues (jobs waiting outside the fixed queue) backlog_aged_dropped = age_backlog_queue(self.backlog_queue, self.metrics, _is_baseline=False) if backlog_aged_dropped > 0: @@ -362,8 +368,8 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, if len(new_jobs) > 0: queue_backlog_mutated = True if backlog_dropped > 0: - self.metrics.jobs_dropped += backlog_dropped - self.metrics.episode_jobs_dropped += backlog_dropped + self.metrics.jobs_rejected_queue_full += backlog_dropped + self.metrics.episode_jobs_rejected_queue_full += backlog_dropped self.metrics.jobs_submitted += new_jobs_count self.metrics.episode_jobs_submitted += new_jobs_count @@ -382,7 +388,7 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, # Assign jobs to available nodes self.env_print(f"[4] Assigning jobs to available nodes...") - num_dropped_this_step = backlog_dropped + num_dropped_this_step = queue_aged_dropped + backlog_aged_dropped + backlog_dropped num_launched_jobs, self.next_empty_slot, queue_dropped, self.next_job_id = assign_jobs_to_available_nodes( job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs, self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False @@ -501,7 +507,9 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, self.metrics.episode_job_age_penalties.append(job_age_penalty_norm * 100) self.metrics.episode_idle_penalties.append(idle_penalty_norm * 100) self.metrics.episode_rewards.append(step_reward) - + self.metrics.jobs_dropped += num_dropped_this_step + self.metrics.episode_jobs_dropped += num_dropped_this_step + # print stats self.env_print(f"[6] End of step stats...") self.env_print("job queue: ", ' '.join(['[{} {} {} {}]'.format(d, a, n, c) for d, a, n, c in job_queue_2d if d > 0])) @@ -549,6 +557,7 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, "num_unprocessed_jobs": num_unprocessed_jobs, "num_on_nodes": num_on_nodes, "episode_jobs_dropped": self.metrics.episode_jobs_dropped, + "episode_jobs_lost_total": self.metrics.episode_jobs_dropped, } return self.state, step_reward, terminated, truncated, info diff --git a/src/evaluation_summary.py b/src/evaluation_summary.py index 0b6fc2f..ebca5a0 100644 --- a/src/evaluation_summary.py +++ b/src/evaluation_summary.py @@ -58,7 +58,7 @@ def build_episode_summary_line( f"({float(episode_data['completion_rate']):.0f}%), " f"AvgWait={float(episode_data['avg_wait_time']):.1f}h, " f"EpisodeMaxQueue={int(episode_data['max_queue_size'])}, " - f"Dropped={int(episode_data['jobs_dropped'])}, " + f"Lost={int(episode_data.get('jobs_lost_total', episode_data['jobs_dropped']))}, " f"TimelineMaxQueue={timeline_max_queue}, " f"Agent Occupancy (Cores)={agent_occupancy_cores_pct:.2f}%, " f"Baseline Occupancy (Cores)={baseline_occupancy_cores_pct:.2f}%, " @@ -67,5 +67,8 @@ def build_episode_summary_line( f"PropPower={float(episode_data['agent_prop_power_mwh']):.1f}/" f"{float(episode_data['baseline_prop_power_mwh']):.1f}/" f"{float(episode_data['baseline_off_prop_power_mwh']):.1f} MWh " - f"(agent/base/base_off)" + f"(agent/base/base_off), " + f"PropSavings=€{float(episode_data['savings_prop_cost_vs_baseline']):.0f}/" + f"€{float(episode_data['savings_prop_cost_vs_baseline_off']):.0f} " + f"(vs base/base_off)" ) diff --git a/src/job_management.py b/src/job_management.py index 3184e79..994de55 100644 --- a/src/job_management.py +++ b/src/job_management.py @@ -5,7 +5,7 @@ import numpy as np from src.config import ( - MAX_NODES, CORES_PER_NODE, MAX_BACKLOG_SIZE + MAX_NODES, CORES_PER_NODE, MAX_BACKLOG_SIZE, MAX_JOB_AGE ) from src.metrics_tracker import MetricsTracker @@ -13,10 +13,9 @@ def age_backlog_queue(backlog_queue: deque, _metrics: MetricsTracker, _is_baseline: bool = False) -> int: """ Age jobs waiting in the backlog queue. - NOTE: dropping based on MAX_JOB_AGE is temporarily disabled via an `if False` hotfix, - so jobs are always kept even if job[1] > MAX_JOB_AGE. - TODO: re-enable drops by removing the `if False` guard and using `job[1] > MAX_JOB_AGE`; - metrics updates are already in the disabled branch. + + Returns the number of jobs dropped for exceeding ``MAX_JOB_AGE``. + Callers are responsible for updating any drop counters exactly once. """ if not backlog_queue: return 0 @@ -25,25 +24,40 @@ def age_backlog_queue(backlog_queue: deque, _metrics: MetricsTracker, _is_baseli kept = [] for job in backlog_queue: job[1] += 1 - # TEMP HOTFIX: disable age-based dropping (keep logic for later). - # if False and job[1] > MAX_JOB_AGE: - # dropped += 1 - # if _is_baseline: - # _metrics.baseline_jobs_dropped += 1 - # _metrics.baseline_dropped_this_episode += 1 - # _metrics.episode_baseline_jobs_dropped += 1 - # else: - # _metrics.jobs_dropped += 1 - # _metrics.dropped_this_episode += 1 - # _metrics.episode_jobs_dropped += 1 - # else: - kept.append(job) + if job[1] > MAX_JOB_AGE: + dropped += 1 + else: + kept.append(job) backlog_queue.clear() backlog_queue.extend(kept) return dropped +def age_job_queue(job_queue_2d: np.ndarray, next_empty_slot: int) -> tuple[int, int]: + """ + Age jobs already waiting in the main queue once for the current step. + + Returns the updated ``next_empty_slot`` together with the number of jobs + dropped for exceeding ``MAX_JOB_AGE``. + """ + dropped = 0 + + for job_idx, job in enumerate(job_queue_2d): + job_duration = job[0] + if job_duration <= 0: + continue + + job_queue_2d[job_idx][1] += 1 + if job_queue_2d[job_idx][1] > MAX_JOB_AGE: + job_queue_2d[job_idx] = [0, 0, 0, 0] + if job_idx < next_empty_slot: + next_empty_slot = job_idx + dropped += 1 + + return next_empty_slot, dropped + + def fill_queue_from_backlog(job_queue_2d: np.ndarray, backlog_queue: deque, next_empty_slot: int) -> tuple[int, int]: """ Move jobs from backlog queue into the real queue (FIFO) until full. @@ -221,11 +235,12 @@ def assign_jobs_to_available_nodes( running_jobs: Dictionary of currently running jobs next_empty_slot: Index of next empty slot in queue next_job_id: Next available job ID - metrics: MetricsTracker object to update with job completion metrics + metrics: MetricsTracker object to update with completion/wait-time metrics is_baseline: Whether this is baseline simulation Returns: - Tuple of (num_processed_jobs, updated next_empty_slot, num_dropped, updated next_job_id) + Tuple of (num_processed_jobs, updated next_empty_slot, num_dropped, updated next_job_id). + Callers are responsible for recording ``num_dropped`` exactly once. """ num_processed_jobs = 0 num_dropped = 0 @@ -266,26 +281,4 @@ def assign_jobs_to_available_nodes( num_processed_jobs += 1 continue - # Not enough resources -> job waits and ages (or gets dropped) - new_age = job_age + 1 - - # TEMP HOTFIX: disable age-based dropping (keep logic for later). - #if False and new_age > MAX_JOB_AGE: - # # Clear job from queue - # job_queue_2d[job_idx] = [0, 0, 0, 0] - # - # # Update next_empty_slot if we cleared a slot before it - # if job_idx < next_empty_slot: - # next_empty_slot = job_idx - # num_dropped += 1 - # - # if is_baseline: - # metrics.baseline_jobs_dropped += 1 - # metrics.episode_baseline_jobs_dropped += 1 - # else: - # metrics.jobs_dropped += 1 - # metrics.episode_jobs_dropped += 1 - #else: - job_queue_2d[job_idx][1] = new_age - return num_processed_jobs, next_empty_slot, num_dropped, next_job_id diff --git a/src/metrics_tracker.py b/src/metrics_tracker.py index 56f72d3..e2895a2 100644 --- a/src/metrics_tracker.py +++ b/src/metrics_tracker.py @@ -156,6 +156,8 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i if self.episode_baseline_jobs_submitted else 0.0 ) + loss_rate = drop_rate + baseline_loss_rate = baseline_drop_rate agent_mean_price: float = self._effective_mean_price( self.episode_total_cost, self.episode_total_power_consumption_mwh ) @@ -200,10 +202,15 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i (COST_IDLE_MW * on + _compute_delta_mw * (cores / CORES_PER_NODE)) * price for on, cores, price in zip(self.episode_on_nodes, self.episode_used_cores, self.episode_price_stats) ) + baseline_prop_cost: float = sum( + (COST_IDLE_MW * MAX_NODES + _compute_delta_mw * (cores / CORES_PER_NODE)) * price + for cores, price in zip(self.episode_baseline_used_cores, self.episode_price_stats) + ) baseline_off_prop_cost: float = sum( (COST_IDLE_MW * used + _compute_delta_mw * (cores / CORES_PER_NODE)) * price for used, cores, price in zip(self.episode_baseline_used_nodes, self.episode_baseline_used_cores, self.episode_price_stats) ) + savings_prop_cost_vs_baseline: float = baseline_prop_cost - agent_prop_cost savings_prop_cost_vs_baseline_off: float = baseline_off_prop_cost - agent_prop_cost dropped_jobs_per_saved_euro: float = self._safe_ratio( float(self.episode_jobs_dropped), savings_vs_baseline @@ -223,6 +230,10 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i 'agent_prop_power_mwh': agent_prop_power_mwh, 'baseline_prop_power_mwh': baseline_prop_power_mwh, 'baseline_off_prop_power_mwh': baseline_off_prop_power_mwh, + 'agent_prop_cost': agent_prop_cost, + 'baseline_prop_cost': baseline_prop_cost, + 'baseline_off_prop_cost': baseline_off_prop_cost, + 'savings_prop_cost_vs_baseline': savings_prop_cost_vs_baseline, 'savings_prop_cost_vs_baseline_off': savings_prop_cost_vs_baseline_off, 'agent_mean_price': agent_mean_price, 'baseline_mean_price': baseline_mean_price, @@ -251,12 +262,16 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i 'baseline_completion_rate': baseline_completion_rate, 'baseline_max_queue_size': self.episode_baseline_max_queue_size_reached, 'baseline_max_backlog_size': self.episode_baseline_max_backlog_size_reached, - # Drop metrics + # Loss metrics: includes age expirations and queue-full rejections. "jobs_dropped": self.episode_jobs_dropped, + "jobs_lost_total": self.episode_jobs_dropped, "drop_rate": drop_rate, + "loss_rate": loss_rate, "jobs_rejected_queue_full": self.episode_jobs_rejected_queue_full, "baseline_jobs_dropped": self.episode_baseline_jobs_dropped, + "baseline_jobs_lost_total": self.episode_baseline_jobs_dropped, "baseline_drop_rate": baseline_drop_rate, + "baseline_loss_rate": baseline_loss_rate, "baseline_jobs_rejected_queue_full": self.episode_baseline_jobs_rejected_queue_full, } self.episode_costs.append(episode_data) diff --git a/src/plotter.py b/src/plotter.py index 52cb375..db4980a 100644 --- a/src/plotter.py +++ b/src/plotter.py @@ -446,7 +446,7 @@ def plot_episode_summary(env: ComputeClusterEnv, episode_costs: list[dict[str, f avg_wait = np.array([ep.get("avg_wait_time", 0.0) for ep in episode_costs], dtype=float) completion = np.array([ep.get("completion_rate", 0.0) for ep in episode_costs], dtype=float) max_queue = np.array([ep.get("max_queue_size", 0.0) for ep in episode_costs], dtype=float) - dropped = np.array([ep.get("jobs_dropped", 0.0) for ep in episode_costs], dtype=float) + dropped = np.array([ep.get("jobs_lost_total", ep.get("jobs_dropped", 0.0)) for ep in episode_costs], dtype=float) fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(20, 6)) @@ -473,15 +473,15 @@ def plot_episode_summary(env: ComputeClusterEnv, episode_costs: list[dict[str, f labels = [line.get_label() for line in lines] ax2.legend(lines, labels, loc="upper left", fontsize=9) - # Max queue + dropped jobs + # Max queue + lost jobs ax3.plot(eps, max_queue, label="Max queue (jobs)", linewidth=2) ax3.set_xlabel("Episode") ax3.set_ylabel("Max queue (jobs)") ax3.grid(True, alpha=0.3) ax3b = ax3.twinx() - ax3b.plot(eps, dropped, label="Dropped jobs", linewidth=2, linestyle="--") - ax3b.set_ylabel("Dropped jobs") + ax3b.plot(eps, dropped, label="Lost jobs", linewidth=2, linestyle="--") + ax3b.set_ylabel("Lost jobs") lines = ax3.get_lines() + ax3b.get_lines() labels = [line.get_label() for line in lines] diff --git a/src/reward_calculation.py b/src/reward_calculation.py index 8262cfc..2e4e06f 100644 --- a/src/reward_calculation.py +++ b/src/reward_calculation.py @@ -332,7 +332,8 @@ def calculate(self, num_used_nodes: int, num_idle_nodes: int, current_price: flo job_queue_2d: 2D job queue array num_unprocessed_jobs: Number of jobs waiting in queue weights: Weights object with weight values - num_dropped_this_step: Number of jobs dropped this step + num_dropped_this_step: Number of jobs lost this step + (aged out in queue/backlog or rejected because queue/backlog was full) env_print: Print function for logging num_on_nodes: Number of powered-on nodes total_used_cores: Total cores in use across all powered nodes @@ -360,11 +361,10 @@ def calculate(self, num_used_nodes: int, num_idle_nodes: int, current_price: flo idle_penalty_norm = self._penalty_idle_normalized(num_idle_nodes) idle_penalty_weighted = weights.idle_weight * idle_penalty_norm - # 6. penalty for dropped jobs (WIP - unnormalized, weighted) + # 6. penalty for lost jobs (aged out or rejected because queue/backlog was full) drop_penalty = min(0, PENALTY_DROPPED_JOB * num_dropped_this_step) drop_penalty_weighted = weights.drop_weight * drop_penalty - reward = ( efficiency_reward_weighted + price_reward_weighted diff --git a/train.py b/train.py index ce6d993..eeaf337 100644 --- a/train.py +++ b/train.py @@ -68,7 +68,7 @@ def main(): parser.add_argument("--price-weight", type=float, default=0.2, help="Weight for price reward") parser.add_argument("--idle-weight", type=float, default=0.1, help="Weight for idle penalty") parser.add_argument("--job-age-weight", type=float, default=0.0, help="Weight for job age penalty") - parser.add_argument("--drop-weight", type=float, default=0.0, help="Weight for dropped jobs penalty (WIP - default 0.0)") + parser.add_argument("--drop-weight", type=float, default=0.0, help="Weight for lost jobs penalty (age expiry or queue-full rejection) (WIP - default 0.0)") parser.add_argument("--iter-limit", type=int, default=0, help=f"Max number of training iterations (1 iteration = {STEPS_PER_ITERATION} steps)") parser.add_argument("--session", default="default", help="Session ID") parser.add_argument("--evaluate-savings", action='store_true', help="Load latest model and evaluate long-term savings (no training)") @@ -329,14 +329,27 @@ def main(): total_agent_cost = sum(float(ep['agent_cost']) for ep in env.metrics.episode_costs) total_baseline_cost = sum(float(ep['baseline_cost']) for ep in env.metrics.episode_costs) total_baseline_off_cost = sum(float(ep['baseline_cost_off']) for ep in env.metrics.episode_costs) - total_jobs_dropped = sum(int(ep.get('jobs_dropped', 0)) for ep in env.metrics.episode_costs) - total_baseline_jobs_dropped = sum(int(ep.get('baseline_jobs_dropped', 0)) for ep in env.metrics.episode_costs) + total_jobs_dropped = sum(int(ep.get('jobs_lost_total', ep.get('jobs_dropped', 0))) for ep in env.metrics.episode_costs) + total_baseline_jobs_dropped = sum(int(ep.get('baseline_jobs_lost_total', ep.get('baseline_jobs_dropped', 0))) for ep in env.metrics.episode_costs) total_agent_power_mwh = sum(float(ep.get('agent_power_consumption_mwh', 0.0)) for ep in env.metrics.episode_costs) total_baseline_power_mwh = sum(float(ep.get('baseline_power_consumption_mwh', 0.0)) for ep in env.metrics.episode_costs) total_baseline_off_power_mwh = sum(float(ep.get('baseline_power_consumption_off_mwh', 0.0)) for ep in env.metrics.episode_costs) + total_agent_prop_power_mwh = sum(float(ep.get('agent_prop_power_mwh', 0.0)) for ep in env.metrics.episode_costs) + total_baseline_prop_power_mwh = sum(float(ep.get('baseline_prop_power_mwh', 0.0)) for ep in env.metrics.episode_costs) + total_baseline_off_prop_power_mwh = sum(float(ep.get('baseline_off_prop_power_mwh', 0.0)) for ep in env.metrics.episode_costs) + total_agent_prop_cost = sum(float(ep.get('agent_prop_cost', 0.0)) for ep in env.metrics.episode_costs) + total_baseline_prop_cost = sum(float(ep.get('baseline_prop_cost', 0.0)) for ep in env.metrics.episode_costs) + total_baseline_off_prop_cost = sum(float(ep.get('baseline_off_prop_cost', 0.0)) for ep in env.metrics.episode_costs) + total_savings_prop_cost_vs_baseline = total_baseline_prop_cost - total_agent_prop_cost + total_savings_prop_cost_vs_baseline_off = total_baseline_off_prop_cost - total_agent_prop_cost total_agent_mean_price = (total_agent_cost / total_agent_power_mwh) if total_agent_power_mwh > 0 else 0.0 total_baseline_mean_price = (total_baseline_cost / total_baseline_power_mwh) if total_baseline_power_mwh > 0 else 0.0 total_baseline_off_mean_price = (total_baseline_off_cost / total_baseline_off_power_mwh) if total_baseline_off_power_mwh > 0 else 0.0 + total_agent_prop_mean_price = (total_agent_prop_cost / total_agent_prop_power_mwh) if total_agent_prop_power_mwh > 0 else 0.0 + total_baseline_prop_mean_price = (total_baseline_prop_cost / total_baseline_prop_power_mwh) if total_baseline_prop_power_mwh > 0 else 0.0 + total_baseline_off_prop_mean_price = (total_baseline_off_prop_cost / total_baseline_off_prop_power_mwh) if total_baseline_off_prop_power_mwh > 0 else 0.0 + prop_savings_pct_vs_baseline = safe_ratio(total_savings_prop_cost_vs_baseline * 100.0, total_baseline_prop_cost) + prop_savings_pct_vs_baseline_off = safe_ratio(total_savings_prop_cost_vs_baseline_off * 100.0, total_baseline_off_prop_cost) total_agent_completion_rate = (total_jobs_completed / total_jobs_submitted * 100) if total_jobs_submitted > 0 else 0.0 total_baseline_completion_rate = (total_baseline_completed / total_baseline_submitted * 100) if total_baseline_submitted > 0 else 0.0 total_savings_vs_baseline = total_baseline_cost - total_agent_cost @@ -374,17 +387,20 @@ def main(): print(f" Baseline: {fmt_optional(total_baseline_cost_per_1000_completed, 2, thousands=True)} €/1k jobs") print(f" Baseline_off: {fmt_optional(total_baseline_off_cost_per_1000_completed, 2, thousands=True)} €/1k jobs") - print(f"\n=== AGENT DROPPED JOBS PER SAVED EURO ===") - print(f" Total Dropped Jobs (Agent): {total_jobs_dropped:,}") - print(f" Total Dropped Jobs (Baseline): {total_baseline_jobs_dropped:,}") + print(f"\n=== AGENT LOST JOBS PER SAVED EURO ===") + print(f" Total Lost Jobs (Agent): {total_jobs_dropped:,}") + print(f" Total Lost Jobs (Baseline): {total_baseline_jobs_dropped:,}") print(f" Vs Baseline: {fmt_optional(total_dropped_jobs_per_saved_euro, 6)} jobs/€") print(f" Vs Baseline_off: {fmt_optional(total_dropped_jobs_per_saved_euro_off, 6)} jobs/€") print(f"\n=== POWER & PRICE METRICS (TOTAL OVER EVALUATION) ===") - print(f" Agent: Power={total_agent_power_mwh:,.1f} MWh, Mean Price={total_agent_mean_price:.2f} €/MWh") - print(f" Baseline: Power={total_baseline_power_mwh:,.1f} MWh, Mean Price={total_baseline_mean_price:.2f} €/MWh") - print(f" Baseline_off: Power={total_baseline_off_power_mwh:,.1f} MWh, Mean Price={total_baseline_off_mean_price:.2f} €/MWh") + print(f" Agent: Power={total_agent_prop_power_mwh:,.1f} MWh, Mean Price={total_agent_prop_mean_price:.2f} €/MWh") + print(f" Baseline: Power={total_baseline_prop_power_mwh:,.1f} MWh, Mean Price={total_baseline_prop_mean_price:.2f} €/MWh") + print(f" Baseline_off: Power={total_baseline_off_prop_power_mwh:,.1f} MWh, Mean Price={total_baseline_off_prop_mean_price:.2f} €/MWh") + print(f"\n=== PROPORTIONAL COST SAVINGS (TOTAL OVER EVALUATION) ===") + print(f" Vs Baseline: €{total_savings_prop_cost_vs_baseline:,.0f}, {fmt_optional(prop_savings_pct_vs_baseline, 1)}%") + print(f" Vs Baseline_off: €{total_savings_prop_cost_vs_baseline_off:,.0f}, {fmt_optional(prop_savings_pct_vs_baseline_off, 1)}%") except Exception as e: print(f"Could not generate cumulative savings plot: {e}")