RewardGuard gives you two levels of monitoring. The free package detects reward imbalances from a snapshot analysis — ideal for catching problems after the fact. The premium package runs statistical detection at every step and can automatically adjust reward weights while training is still running, keeping a timestamped log of every correction it makes.
Free — rewardguard
- Rolling-window balance analysis
- Per-component imbalance report
- Suggested weight multipliers
- Call
check() whenever you want a snapshot
Premium — rewardguard-premium
- Per-step alignment score & z-scores
- Automatic weight correction
- Full timestamped correction log
- CSV / JSON export, save & resume
1
Installation
Free (open source, MIT)
pip install rewardguard
Premium (requires license key from your dashboard)
pip install rewardguard-premium
The premium package imports the free package internally — you only need to install one. If you have both installed, use from rewardguard_premium import AutoMonitor for premium features.
Free tier — Monitor
The free Monitor is a zero-dependency, in-loop monitor. You tell it your expected reward distribution, feed it steps, and call check() whenever you want an analysis.
1. Initialize
Pass expected as a dict of component → weight. The weights are relative — they don't need to sum to 100. Monitor normalizes them automatically.
import rewardguard as rg
monitor = rg.Monitor(
expected={"task": 0.7, "safety": 0.3},
tolerance=5.0, # ±pp before flagging (default 5.0)
window=200, # rolling window size in steps (default 200)
max_history=100_000, # hard cap on stored steps
)
2. Feed steps
Call monitor.step(rewards) once per environment step with a dict of component values. All values must be finite numbers.
for episode in range(num_episodes):
state = env.reset()
done = False
while not done:
action = policy.act(state)
next_state, _, done, info = env.step(action)
r_task = info["task_reward"]
r_safety = info["safety_reward"]
# One extra call per step — that's all
monitor.step({"task": r_task, "safety": r_safety})
state = next_state
3. Check balance
check() returns an AnalysisResult computed over the last window steps. Call it as often or as rarely as you like — it does not modify state.
# Check after every episode, or every N steps — your choice
result = monitor.check()
# Overall severity: "ok", "warning", or "critical"
print(result.severity)
# Per-component breakdown
for comp, info in result.imbalance_report.items():
print(f"{comp}: real={info['real']:.1f}% expected={info['expected']:.1f}% → {info['recommendation']}")
# Suggested multipliers to rebalance
print(result.suggested_reward_weights)
# Or print a full formatted report to stdout
monitor.print_report()
Example output from print_report()
============================================================
REWARDGUARD ANALYSIS REPORT
*** OVERALL SEVERITY: WARNING ***
============================================================
Episodes analyzed : 847
Sources found : safety, task
Source Real % Expected % Diff Severity
--------------- ---------- ------------ ---------- --------
task 52.3 70.0 -17.7 WARNING
safety 47.7 30.0 +17.7 WARNING
Suggested weight multipliers:
safety: 0.84x <-- ADJUST
task: 1.05x <-- ADJUST
Actions needed:
• task: Increase weight by ~17.7%
• safety: Decrease weight by ~17.7%
============================================================
Resetting the history
Call monitor.reset() to clear all accumulated steps without changing the configuration. Useful between training phases.
monitor.reset()
print(monitor.step_count) # → 0
What the free tier cannot do
The free Monitor does not log individual step-level changes over time — check() always reflects the current window, not history. To get a per-step audit trail of how reward balance shifted during training, you need the premium AutoMonitor below.
Premium tier — AutoMonitor
AutoMonitor is a drop-in superset of Monitor. Every free-tier method works unchanged. The key difference is that step() now returns an AlignmentSnapshot after the baseline warm-up completes — a timestamped record of the alignment state at that exact step.
Initialize
from rewardguard_premium import AutoMonitor
monitor = AutoMonitor(
expected={"task": 0.7, "safety": 0.3},
# Warm-up: steps collected before detection activates
baseline_steps=300,
# Automatically adjust weights when a component is flagged
auto_correct=True,
# How aggressively to correct (0.2 = 20% of needed correction per call)
correction_rate=0.2,
# σ threshold for flagging (default 2.5)
z_threshold=2.5,
# Minimum post-baseline steps before first auto-correction
min_confidence_steps=50,
)
The step loop
During the first baseline_steps steps, step() returns None — the monitor is learning your environment's baseline distribution. After that it returns an AlignmentSnapshot every call.
for step_idx in range(total_steps):
action = policy.act(state)
next_state, _, done, info = env.step(action)
snapshot = monitor.step({
"task": info["task_reward"],
"safety": info["safety_reward"],
})
# snapshot is None during baseline warm-up
if snapshot is not None:
if snapshot.flag == "critical":
# Apply corrected weights to the environment immediately
env.set_reward_weights(monitor.weights)
print(f"Step {snapshot.step}: CRITICAL — weights updated: {monitor.weights}")
if done:
state = env.reset()
else:
state = next_state
Logging reward weight changes
Every AlignmentSnapshot has a corrections_applied field — a dict of {component: new_weight} for any weights the monitor adjusted at that step. If no correction was made it's an empty dict.
# Inspect corrections as they happen
snapshot = monitor.step(rewards)
if snapshot and snapshot.corrections_applied:
print(f"[step {snapshot.step}] weights changed → {snapshot.corrections_applied}")
# After training: iterate the full correction history
for snap in monitor.snapshots:
if snap.corrections_applied:
print(
f"step={snap.step:6d} "
f"score={snap.alignment_score:.3f} "
f"flag={snap.flag:<8s} "
f"corrections={snap.corrections_applied}"
)
Example output
step= 412 score=0.431 flag=warning corrections={'safety': 1.0460}
step= 530 score=0.318 flag=critical corrections={'safety': 1.0955, 'task': 0.9740}
step= 648 score=0.402 flag=critical corrections={'safety': 1.1243}
step= 780 score=0.511 flag=warning corrections={'safety': 1.0887}
step= 940 score=0.693 flag=warning corrections={'safety': 1.0412}
step= 1104 score=0.812 flag=ok corrections={}
Each snapshot also carries the full alignment state at that moment:
latest = monitor.snapshots[-1]
latest.step # global step index
latest.alignment_score # 0.0 (misaligned) → 1.0 (aligned)
latest.component_ratios # rolling-window % share per component
latest.z_scores # deviation from baseline in σ per component
latest.drift_velocity # slope of alignment score — negative = worsening
latest.flag # "ok" / "warning" / "critical"
latest.corrections_applied # weights changed at this step
latest.starvation_alerts # components near-zero for starvation_window steps
Current weights at any time
Use monitor.weights to read the current multipliers. These start at 1.0 and drift as auto-correction runs. Always apply monitor.weights — not a stale copy — when updating your environment.
Exporting the full history
Three export methods give you the complete per-step record for downstream analysis.
CSV — one row per detection-phase step
Columns: step, alignment_score, flag, drift_velocity, starvation_alerts, then ratio_<comp> and z_<comp> for every component.
# Returns the CSV string and optionally writes it to a file
csv_str = monitor.to_csv("training_run_42.csv")
print(csv_str[:400])
CSV preview
step,alignment_score,flag,drift_velocity,starvation_alerts,ratio_safety,z_safety,ratio_task,z_task
301,0.983241,ok,0.000000,,29.84,-0.1234,70.16,0.1234
302,0.981002,ok,0.000000,,30.12,-0.0981,69.88,0.0981
...
412,0.431008,warning,-0.002341,,47.23,2.6801,52.77,-2.6801
JSON — full state including weights and baseline
json_str = monitor.to_json("training_run_42.json")
Print a premium report to stdout
monitor.print_report()
# Prints the free-tier balance table PLUS:
# alignment score, drift velocity, current weights, z-scores
Framework callbacks
Pass a list of callables to the callbacks constructor argument. Each callback receives the AlignmentSnapshot after every post-baseline step. Three built-in factories are provided.
Weights & Biases
import wandb
from rewardguard_premium import AutoMonitor, make_wandb_callback
wandb.init(project="my-rl-run")
monitor = AutoMonitor(
expected={"task": 0.7, "safety": 0.3},
callbacks=[make_wandb_callback()],
)
This logs rewardguard/alignment_score, rewardguard/drift_velocity, rewardguard/ratio/<comp>, and rewardguard/z_score/<comp> to your W&B run at each step.
TensorBoard
from torch.utils.tensorboard import SummaryWriter
from rewardguard_premium import AutoMonitor, make_tensorboard_callback
writer = SummaryWriter("runs/my_run")
monitor = AutoMonitor(
expected={"task": 0.7, "safety": 0.3},
callbacks=[make_tensorboard_callback(writer)],
)
Stable-Baselines3
The SB3 callback reads info["reward_components"] from each environment step. Your environment must include this key.
from stable_baselines3 import PPO
from rewardguard_premium import AutoMonitor, make_sb3_callback
monitor = AutoMonitor(expected={"task": 0.7, "safety": 0.3})
cb = make_sb3_callback(monitor)
model = PPO("MlpPolicy", env)
model.learn(total_timesteps=500_000, callback=cb)
# After training, inspect or export the history
monitor.to_csv("sb3_run.csv")
Custom callback
Any callable that accepts an AlignmentSnapshot works as a callback.
def my_callback(snapshot):
if snapshot.corrections_applied:
my_logger.info(
"step=%d corrections=%s",
snapshot.step, snapshot.corrections_applied
)
monitor = AutoMonitor(
expected={"task": 0.7, "safety": 0.3},
callbacks=[my_callback],
)
Saving and resuming a run
Long runs can be checkpointed and resumed. The saved state includes the baseline statistics, all snapshots, and the current weights — so a resumed run continues seamlessly.
# Save at the end of training (or any checkpoint)
monitor.save("run_42_state.json")
# Resume in a new process
monitor = AutoMonitor.load("run_42_state.json")
print(monitor.step_count) # picks up from where it left off
print(monitor.weights) # previously learned weights
print(len(monitor.snapshots)) # all historical snapshots intact
Tip
When loading a saved state you can override any constructor parameter via kwargs — for example AutoMonitor.load("state.json", auto_correct=False) to replay the saved history in read-only mode.
That covers the full reward-change logging workflow. The free Monitor gives you on-demand balance snapshots with zero overhead. The premium AutoMonitor turns every training step into a structured record — with timestamped corrections, exportable history, and direct integrations with the tooling you already use.