Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions src/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
age_backlog_queue,
)
from src.metrics_tracker import MetricsTracker
from src.reward_calculation import power_cost, power_consumption_mwh
from src.reward_calculation import power_consumption_mwh
from src.config import CORES_PER_NODE


Expand Down Expand Up @@ -120,26 +120,13 @@ def baseline_step(

baseline_state['job_queue'] = job_queue_2d.flatten()

baseline_cost = power_cost(baseline_state['nodes'], baseline_cores_available, current_price)
env_print(f" > baseline_cost: €{baseline_cost:.4f} | used nodes: {num_used_nodes}, idle nodes: {num_idle_nodes}")
baseline_cost_off = power_cost(
baseline_state['nodes'],
baseline_cores_available,
current_price,
include_idle_nodes=False,
)
env_print(f" > baseline_cost_off: €{baseline_cost_off:.4f} | used nodes: {num_used_nodes}, idle nodes: 0")
baseline_power_mwh = power_consumption_mwh(baseline_state['nodes'], baseline_cores_available)
baseline_power_off_mwh = power_consumption_mwh(
baseline_state['nodes'],
baseline_cores_available,
include_idle_nodes=False,
)

num_used_cores = num_on_nodes * CORES_PER_NODE - np.sum(baseline_cores_available)

num_used_cores = int(num_on_nodes * CORES_PER_NODE - np.sum(baseline_cores_available))
baseline_power_mwh = power_consumption_mwh(num_on_nodes, num_used_cores)
baseline_power_off_mwh = power_consumption_mwh(num_used_nodes, num_used_cores)
baseline_cost = baseline_power_mwh * current_price
baseline_cost_off = baseline_power_off_mwh * current_price
env_print(f" > baseline_cost: €{baseline_cost:.4f} | used nodes: {num_used_nodes}, idle nodes: {num_idle_nodes}")
env_print(f" > baseline_cost_off: €{baseline_cost_off:.4f} | used nodes: {num_used_nodes}, idle nodes: 0")

return (
baseline_cost,
Expand Down
12 changes: 4 additions & 8 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
combined_queue_size = num_unprocessed_jobs + len(self.backlog_queue)
num_unprocessed_jobs = combined_queue_size
average_future_price = float(np.mean(self.state['predicted_prices']))
num_used_cores = num_on_nodes * CORES_PER_NODE - np.sum(self.cores_available)
num_used_cores = int(num_on_nodes * CORES_PER_NODE - np.sum(self.cores_available))
num_running_jobs = len(self.running_jobs)

# update stats
Expand Down Expand Up @@ -479,16 +479,12 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,

step_reward, step_cost, eff_reward_norm, price_reward, idle_penalty_norm, job_age_penalty_norm = self.reward_calculator.calculate(
num_used_nodes, num_idle_nodes, current_price, average_future_price,
num_off_nodes, num_launched_jobs, num_node_changes, job_queue_2d,
num_unprocessed_jobs, self.weights, num_dropped_this_step, self.env_print,
nodes=self.state['nodes'], cores_available=self.cores_available,
num_off_nodes, job_queue_2d, num_unprocessed_jobs, self.weights,
num_dropped_this_step, self.env_print, num_on_nodes, num_used_cores,
)


self.metrics.episode_reward += step_reward
step_power_mwh = power_consumption_mwh(self.state['nodes'], self.cores_available)

step_cost = step_power_mwh * current_price
step_power_mwh = power_consumption_mwh(num_on_nodes, num_used_cores)
self.metrics.total_cost += step_cost
self.metrics.episode_total_cost += step_cost
self.metrics.total_power_consumption_mwh += step_power_mwh
Expand Down
159 changes: 36 additions & 123 deletions src/reward_calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,103 +13,40 @@
from src.weights import Weights


def _power_consumption_from_state_mwh(
nodes: np.ndarray,
cores_available: np.ndarray,
include_idle_nodes: bool = True,
) -> float:
"""Calculate step energy from per-node core utilization."""
on_mask = nodes >= 0
if not np.any(on_mask):
return 0.0

used_cores = np.zeros_like(cores_available, dtype=float)
used_cores[on_mask] = CORES_PER_NODE - cores_available[on_mask]
used_cores = np.clip(used_cores, 0.0, float(CORES_PER_NODE))

utilization = np.zeros_like(used_cores, dtype=float)
utilization[on_mask] = used_cores[on_mask] / float(CORES_PER_NODE)

powered_mask = on_mask if include_idle_nodes else (on_mask & (used_cores > 0.0))
if not np.any(powered_mask):
return 0.0

node_power_mw = np.zeros_like(utilization, dtype=float)
node_power_mw[powered_mask] = COST_IDLE_MW + (COST_USED_MW - COST_IDLE_MW) * utilization[powered_mask]
return float(np.sum(node_power_mw))


def _used_cores_from_state(nodes: np.ndarray, cores_available: np.ndarray) -> float:
"""Calculate the total number of used cores across all powered nodes."""
on_mask = nodes >= 0
if not np.any(on_mask):
return 0.0

used_cores = np.zeros_like(cores_available, dtype=float)
used_cores[on_mask] = CORES_PER_NODE - cores_available[on_mask]
used_cores = np.clip(used_cores, 0.0, float(CORES_PER_NODE))
return float(np.sum(used_cores))


def power_cost(
nodes_or_num_used: np.ndarray | int,
cores_or_num_idle: np.ndarray | int,
current_price: float,
include_idle_nodes: bool = True,
) -> float:
def power_consumption_mwh(num_powered_nodes: int, total_used_cores: int) -> float:
"""
Calculate power cost for one environment step.
Calculate energy consumption for one environment step.

One environment step equals one hour, so this is both average MW and MWh/step.
All powered-on nodes draw an idle baseline; the compute delta scales linearly
with core utilization: COST_IDLE_MW * num_powered + (COST_USED_MW - COST_IDLE_MW) * total_used_cores / CORES_PER_NODE.

Args:
nodes_or_num_used: Either node-state array or number of used nodes
cores_or_num_idle: Either cores-available array or number of idle nodes
current_price: Current electricity price
include_idle_nodes: Whether idle powered-on nodes should contribute energy
num_powered_nodes: Number of powered-on nodes (include idle for baseline, exclude for baseline_off)
total_used_cores: Total cores in use across all powered nodes

Returns:
Total power cost
"""
return power_consumption_mwh(
nodes_or_num_used,
cores_or_num_idle,
include_idle_nodes=include_idle_nodes,
) * current_price


def power_consumption_mwh(
nodes_or_num_used: np.ndarray | int,
cores_or_num_idle: np.ndarray | int,
include_idle_nodes: bool = True,
) -> float:
Energy consumption in MWh for this step
"""
Calculate energy consumption for one environment step.
return num_powered_nodes * COST_IDLE_MW + (COST_USED_MW - COST_IDLE_MW) * total_used_cores / CORES_PER_NODE

One environment step equals one hour, so this is both average MW and MWh/step.

def power_cost(num_powered_nodes: int, total_used_cores: int, current_price: float) -> float:
"""
Calculate power cost for one environment step.

Args:
nodes_or_num_used: Either node-state array or number of used nodes
cores_or_num_idle: Either cores-available array or number of idle nodes
include_idle_nodes: Whether idle powered-on nodes should contribute energy
num_powered_nodes: Number of powered-on nodes
total_used_cores: Total cores in use across all powered nodes
current_price: Current electricity price

Returns:
Energy consumption in MWh for this step
Total power cost
"""
if np.isscalar(nodes_or_num_used):
num_used_nodes = int(nodes_or_num_used)
num_idle_nodes = int(cores_or_num_idle) if include_idle_nodes else 0
return COST_IDLE_MW * num_idle_nodes + COST_USED_MW * num_used_nodes

nodes = np.asarray(nodes_or_num_used)
cores_available = np.asarray(cores_or_num_idle, dtype=float)
return _power_consumption_from_state_mwh(
nodes,
cores_available,
include_idle_nodes=include_idle_nodes,
)
return power_consumption_mwh(num_powered_nodes, total_used_cores) * current_price


class RewardCalculator:

"""Calculates rewards with pre-computed normalization bounds."""
EFFICIENCY_TARGET_RATIO = 0.70
EFFICIENCY_GAIN = 5.0
Expand Down Expand Up @@ -145,18 +82,14 @@ def __init__(self, prices: Prices) -> None:
def _compute_bounds(self) -> None:
"""Compute min/max bounds for reward normalization."""
# Efficiency bounds
cost_for_min_efficiency = power_cost(0, MAX_NODES, self.prices.MAX_PRICE)
cost_for_max_efficiency = power_cost(MAX_NODES, 0, self.prices.MIN_PRICE)
cost_for_min_efficiency = power_cost(MAX_NODES, 0, self.prices.MAX_PRICE) # all nodes idle
cost_for_max_efficiency = power_cost(MAX_NODES, MAX_NODES * CORES_PER_NODE, self.prices.MIN_PRICE) # all nodes fully used

self._min_efficiency_reward = self._reward_efficiency(0, cost_for_min_efficiency)
self._max_efficiency_reward = max(1.0, self._reward_efficiency(MAX_NODES, cost_for_max_efficiency))

# Price bounds (legacy behavior kept for debugging/ablation).
self._max_price_reward_legacy = self._reward_price_legacy(
self.prices.MIN_PRICE,
self.prices.MAX_PRICE,
MAX_NEW_JOBS_PER_HOUR,
)
self._max_price_reward_legacy = self._reward_price_legacy(self.prices.MIN_PRICE, self.prices.MAX_PRICE, MAX_NEW_JOBS_PER_HOUR)
self._min_price_reward_legacy = -self._max_price_reward_legacy

# Idle penalty bounds
Expand Down Expand Up @@ -217,7 +150,7 @@ def _reward_price(self, current_price: float, average_future_price: float, num_u
- Saturates quickly with better-than-context prices and used nodes.
- Always applies overdrive when current price is negative.
"""

if num_used_nodes <= 0:
return 0.0

Expand Down Expand Up @@ -305,37 +238,27 @@ def _reward_energy_efficiency_normalized(self, num_used_nodes: int, num_idle_nod
return 0.0 # nothing on => no "efficiency" signal
return 2*(float(np.clip((num_used_nodes * COST_USED_MW) / total_work, 0.0, 1.0))) - 1.0 # scale to [-1, 1] so that it can be weighted in either direction without exceeding bounds.

def _reward_energy_efficiency_utilization_normalized(
self,
nodes: np.ndarray,
cores_available: np.ndarray,
) -> float:
def _reward_energy_efficiency_utilization_normalized(self, num_on_nodes: int, total_used_cores: int) -> float:
"""
Utilization-aware efficiency reward based on delivered core-hours per MWh.

Theoretical optimum under the current affine power model is achieved when every
powered node is fully utilized, i.e. 96 cores at 450 W.
"""
step_power_mwh = power_consumption_mwh(nodes, cores_available)
step_power_mwh = power_consumption_mwh(num_on_nodes, total_used_cores)
if step_power_mwh <= 0.0:
return 0.0

used_cores = _used_cores_from_state(nodes, cores_available)
if used_cores <= 0.0:
if total_used_cores <= 0.0:
efficiency_ratio = 0.0
else:
efficiency_raw = used_cores / step_power_mwh # core-hours per MWh for this 1h step
efficiency_raw = total_used_cores / step_power_mwh # core-hours per MWh for this 1h step
efficiency_max = float(CORES_PER_NODE) / COST_USED_MW
efficiency_ratio = float(np.clip(efficiency_raw / efficiency_max, 0.0, 1.0))

return float(np.tanh(self.EFFICIENCY_GAIN * (efficiency_ratio - self.EFFICIENCY_TARGET_RATIO)))

def _reward_price_utilization(
self,
current_price: float,
average_future_price: float,
used_cores: float,
) -> float:
def _reward_price_utilization(self, current_price: float, average_future_price: float, used_cores: int) -> float:
"""
Price-timing reward scaled by useful work volume, measured as equivalent fully used nodes.

Expand Down Expand Up @@ -393,10 +316,10 @@ def _blackout_term(self, num_used_nodes: int, num_idle_nodes: int, num_unprocess
return float(np.clip(penalty, -1.0, 0.0))

def calculate(self, num_used_nodes: int, num_idle_nodes: int, current_price: float, average_future_price: float,
num_off_nodes: int, _num_processed_jobs: int, num_node_changes: int, job_queue_2d: np.ndarray, # noqa: ARG002 - _num_processed_jobs legacy; num_node_changes reserved for future node-change penalty
num_off_nodes: int, job_queue_2d: np.ndarray,
num_unprocessed_jobs: int, weights: Weights, num_dropped_this_step: int,
env_print: Callable[..., None], nodes: np.ndarray | None = None,
cores_available: np.ndarray | None = None) -> tuple[float, float, float, float, float, float]:
env_print: Callable[..., None], num_on_nodes: int,
total_used_cores: int) -> tuple[float, float, float, float, float, float]:
"""
Calculate total reward by aggregating weighted components.

Expand All @@ -406,32 +329,22 @@ def calculate(self, num_used_nodes: int, num_idle_nodes: int, current_price: flo
current_price: Current electricity price
average_future_price: Average predicted future price
num_off_nodes: Number of offline nodes
_num_processed_jobs: Number of jobs launched this step (legacy param, unused by active price reward)
num_node_changes: Number of node state changes
job_queue_2d: 2D job queue array
num_unprocessed_jobs: Number of jobs waiting in queue
weights: Weights object with weight values
num_dropped_this_step: Number of jobs dropped this step
env_print: Print function for logging
nodes: Optional node-state array for utilization-based power accounting
cores_available: Optional per-node free-core array for utilization-based power accounting
num_on_nodes: Number of powered-on nodes
total_used_cores: Total cores in use across all powered nodes

Returns:
Tuple of (total reward, total cost, eff_reward_norm, price_reward,
idle_penalty_norm, job_age_penalty_norm)
"""
# 0. Energy efficiency. Reward calculation based on Workload (used nodes) (W) / Cost (C)
if nodes is not None and cores_available is not None:
total_cost = power_cost(nodes, cores_available, current_price)
used_cores = _used_cores_from_state(nodes, cores_available)
efficiency_reward_norm = self._reward_energy_efficiency_utilization_normalized(nodes, cores_available)
price_reward = self._reward_price_utilization(current_price, average_future_price, used_cores)
else:
total_cost = power_cost(num_used_nodes, num_idle_nodes, current_price)
efficiency_reward_norm = self._reward_energy_efficiency_normalized(num_used_nodes, num_idle_nodes)
price_reward = self._reward_price(
current_price, average_future_price, num_used_nodes
)
total_cost = power_cost(num_on_nodes, total_used_cores, current_price)
efficiency_reward_norm = self._reward_energy_efficiency_utilization_normalized(num_on_nodes, total_used_cores)
price_reward = self._reward_price_utilization(current_price, average_future_price, total_used_cores)

efficiency_reward_norm += self._blackout_term(num_used_nodes, num_idle_nodes, num_unprocessed_jobs)
efficiency_reward_weighted = weights.efficiency_weight * efficiency_reward_norm
Expand Down
2 changes: 2 additions & 0 deletions test/test_price_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def test_get_price_context_uses_window():


if __name__ == "__main__":
import sys
all_pass = True
all_pass &= test_price_history_bounded()
all_pass &= test_rolling_window_behavior()
Expand All @@ -130,3 +131,4 @@ def test_get_price_context_uses_window():
print("✓ All tests passed!")
else:
print("✗ Some tests failed")
sys.exit(1)
22 changes: 6 additions & 16 deletions test/test_prices_cycling.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,12 @@ def test_cycling_behavior():
print(f"Episode 3 prices: {episode_3_prices}")

# Check that episodes use different price sequences (unless wrapped)
if episode_1_prices != episode_2_prices:
print("✓ Episodes 1 and 2 use different price sequences (good!)")
else:
print("✗ Episodes 1 and 2 use same prices (bad - not cycling)")
assert episode_1_prices != episode_2_prices, "Episodes 1 and 2 use same prices (not cycling)"
print("✓ Episodes 1 and 2 use different price sequences (good!)")

# Episode 3 should equal Episode 1 (wrapped back around)
if episode_1_prices == episode_3_prices:
print("✓ Episode 3 wrapped around to match Episode 1 (good!)")
else:
print(f"✗ Episode 3 doesn't match Episode 1 (unexpected)")
print(f" Expected: {episode_1_prices}")
print(f" Got: {episode_3_prices}")
assert episode_1_prices == episode_3_prices, f"Episode 3 doesn't match Episode 1: expected {episode_1_prices}, got {episode_3_prices}"
print("✓ Episode 3 wrapped around to match Episode 1 (good!)")


def test_determinism():
Expand Down Expand Up @@ -117,12 +111,8 @@ def test_determinism():
predicted = prices2.advance_and_get_predicted_prices()
run2_prices.append(float(predicted[0]))

if run1_prices == run2_prices:
print("✓ Determinism verified: same start_index gives same sequence")
else:
print("✗ Non-deterministic: different sequences")
print(f"Run 1 (first 5): {run1_prices[:5]}")
print(f"Run 2 (first 5): {run2_prices[:5]}")
assert run1_prices == run2_prices, f"Non-deterministic: run1={run1_prices[:5]} run2={run2_prices[:5]}"
print("✓ Determinism verified: same start_index gives same sequence")


def test_environment_simulation():
Expand Down
Loading