In this tutorial
  1. Installation
  2. Free tier — Monitor
  3. Premium tier — AutoMonitor
  4. Logging reward weight changes
  5. Exporting the full history
  6. Framework callbacks (WandB, TensorBoard, SB3)
  7. Saving and resuming a run

RewardGuard gives you two levels of monitoring. The free package detects reward imbalances from a snapshot analysis — ideal for catching problems after the fact. The premium package runs statistical detection at every step and can automatically adjust reward weights while training is still running, keeping a timestamped log of every correction it makes.

Free — rewardguard
  • Rolling-window balance analysis
  • Per-component imbalance report
  • Suggested weight multipliers
  • Call check() whenever you want a snapshot
Premium — rewardguard-premium
  • Per-step alignment score & z-scores
  • Automatic weight correction
  • Full timestamped correction log
  • CSV / JSON export, save & resume
1

Installation

Free (open source, MIT)
pip install rewardguard
Premium (requires license key from your dashboard)
pip install rewardguard-premium

The premium package imports the free package internally — you only need to install one. If you have both installed, use from rewardguard_premium import AutoMonitor for premium features.

Free tier — Monitor

The free Monitor is a zero-dependency, in-loop monitor. You tell it your expected reward distribution, feed it steps, and call check() whenever you want an analysis.

1. Initialize

Pass expected as a dict of component → weight. The weights are relative — they don't need to sum to 100. Monitor normalizes them automatically.

import rewardguard as rg monitor = rg.Monitor( expected={"task": 0.7, "safety": 0.3}, tolerance=5.0, # ±pp before flagging (default 5.0) window=200, # rolling window size in steps (default 200) max_history=100_000, # hard cap on stored steps )

2. Feed steps

Call monitor.step(rewards) once per environment step with a dict of component values. All values must be finite numbers.

for episode in range(num_episodes): state = env.reset() done = False while not done: action = policy.act(state) next_state, _, done, info = env.step(action) r_task = info["task_reward"] r_safety = info["safety_reward"] # One extra call per step — that's all monitor.step({"task": r_task, "safety": r_safety}) state = next_state

3. Check balance

check() returns an AnalysisResult computed over the last window steps. Call it as often or as rarely as you like — it does not modify state.

# Check after every episode, or every N steps — your choice result = monitor.check() # Overall severity: "ok", "warning", or "critical" print(result.severity) # Per-component breakdown for comp, info in result.imbalance_report.items(): print(f"{comp}: real={info['real']:.1f}% expected={info['expected']:.1f}% → {info['recommendation']}") # Suggested multipliers to rebalance print(result.suggested_reward_weights) # Or print a full formatted report to stdout monitor.print_report()
Example output from print_report()
============================================================ REWARDGUARD ANALYSIS REPORT *** OVERALL SEVERITY: WARNING *** ============================================================ Episodes analyzed : 847 Sources found : safety, task Source Real % Expected % Diff Severity --------------- ---------- ------------ ---------- -------- task 52.3 70.0 -17.7 WARNING safety 47.7 30.0 +17.7 WARNING Suggested weight multipliers: safety: 0.84x <-- ADJUST task: 1.05x <-- ADJUST Actions needed: • task: Increase weight by ~17.7% • safety: Decrease weight by ~17.7% ============================================================

Resetting the history

Call monitor.reset() to clear all accumulated steps without changing the configuration. Useful between training phases.

monitor.reset() print(monitor.step_count) # → 0
What the free tier cannot do

The free Monitor does not log individual step-level changes over time — check() always reflects the current window, not history. To get a per-step audit trail of how reward balance shifted during training, you need the premium AutoMonitor below.

Premium tier — AutoMonitor

AutoMonitor is a drop-in superset of Monitor. Every free-tier method works unchanged. The key difference is that step() now returns an AlignmentSnapshot after the baseline warm-up completes — a timestamped record of the alignment state at that exact step.

Initialize

from rewardguard_premium import AutoMonitor monitor = AutoMonitor( expected={"task": 0.7, "safety": 0.3}, # Warm-up: steps collected before detection activates baseline_steps=300, # Automatically adjust weights when a component is flagged auto_correct=True, # How aggressively to correct (0.2 = 20% of needed correction per call) correction_rate=0.2, # σ threshold for flagging (default 2.5) z_threshold=2.5, # Minimum post-baseline steps before first auto-correction min_confidence_steps=50, )

The step loop

During the first baseline_steps steps, step() returns None — the monitor is learning your environment's baseline distribution. After that it returns an AlignmentSnapshot every call.

for step_idx in range(total_steps): action = policy.act(state) next_state, _, done, info = env.step(action) snapshot = monitor.step({ "task": info["task_reward"], "safety": info["safety_reward"], }) # snapshot is None during baseline warm-up if snapshot is not None: if snapshot.flag == "critical": # Apply corrected weights to the environment immediately env.set_reward_weights(monitor.weights) print(f"Step {snapshot.step}: CRITICAL — weights updated: {monitor.weights}") if done: state = env.reset() else: state = next_state

Logging reward weight changes

Every AlignmentSnapshot has a corrections_applied field — a dict of {component: new_weight} for any weights the monitor adjusted at that step. If no correction was made it's an empty dict.

# Inspect corrections as they happen snapshot = monitor.step(rewards) if snapshot and snapshot.corrections_applied: print(f"[step {snapshot.step}] weights changed → {snapshot.corrections_applied}") # After training: iterate the full correction history for snap in monitor.snapshots: if snap.corrections_applied: print( f"step={snap.step:6d} " f"score={snap.alignment_score:.3f} " f"flag={snap.flag:<8s} " f"corrections={snap.corrections_applied}" )
Example output
step= 412 score=0.431 flag=warning corrections={'safety': 1.0460} step= 530 score=0.318 flag=critical corrections={'safety': 1.0955, 'task': 0.9740} step= 648 score=0.402 flag=critical corrections={'safety': 1.1243} step= 780 score=0.511 flag=warning corrections={'safety': 1.0887} step= 940 score=0.693 flag=warning corrections={'safety': 1.0412} step= 1104 score=0.812 flag=ok corrections={}

Each snapshot also carries the full alignment state at that moment:

latest = monitor.snapshots[-1] latest.step # global step index latest.alignment_score # 0.0 (misaligned) → 1.0 (aligned) latest.component_ratios # rolling-window % share per component latest.z_scores # deviation from baseline in σ per component latest.drift_velocity # slope of alignment score — negative = worsening latest.flag # "ok" / "warning" / "critical" latest.corrections_applied # weights changed at this step latest.starvation_alerts # components near-zero for starvation_window steps
Current weights at any time

Use monitor.weights to read the current multipliers. These start at 1.0 and drift as auto-correction runs. Always apply monitor.weights — not a stale copy — when updating your environment.

Exporting the full history

Three export methods give you the complete per-step record for downstream analysis.

CSV — one row per detection-phase step

Columns: step, alignment_score, flag, drift_velocity, starvation_alerts, then ratio_<comp> and z_<comp> for every component.

# Returns the CSV string and optionally writes it to a file csv_str = monitor.to_csv("training_run_42.csv") print(csv_str[:400])
CSV preview
step,alignment_score,flag,drift_velocity,starvation_alerts,ratio_safety,z_safety,ratio_task,z_task 301,0.983241,ok,0.000000,,29.84,-0.1234,70.16,0.1234 302,0.981002,ok,0.000000,,30.12,-0.0981,69.88,0.0981 ... 412,0.431008,warning,-0.002341,,47.23,2.6801,52.77,-2.6801

JSON — full state including weights and baseline

json_str = monitor.to_json("training_run_42.json")

Print a premium report to stdout

monitor.print_report() # Prints the free-tier balance table PLUS: # alignment score, drift velocity, current weights, z-scores

Framework callbacks

Pass a list of callables to the callbacks constructor argument. Each callback receives the AlignmentSnapshot after every post-baseline step. Three built-in factories are provided.

Weights & Biases

import wandb from rewardguard_premium import AutoMonitor, make_wandb_callback wandb.init(project="my-rl-run") monitor = AutoMonitor( expected={"task": 0.7, "safety": 0.3}, callbacks=[make_wandb_callback()], )

This logs rewardguard/alignment_score, rewardguard/drift_velocity, rewardguard/ratio/<comp>, and rewardguard/z_score/<comp> to your W&B run at each step.

TensorBoard

from torch.utils.tensorboard import SummaryWriter from rewardguard_premium import AutoMonitor, make_tensorboard_callback writer = SummaryWriter("runs/my_run") monitor = AutoMonitor( expected={"task": 0.7, "safety": 0.3}, callbacks=[make_tensorboard_callback(writer)], )

Stable-Baselines3

The SB3 callback reads info["reward_components"] from each environment step. Your environment must include this key.

from stable_baselines3 import PPO from rewardguard_premium import AutoMonitor, make_sb3_callback monitor = AutoMonitor(expected={"task": 0.7, "safety": 0.3}) cb = make_sb3_callback(monitor) model = PPO("MlpPolicy", env) model.learn(total_timesteps=500_000, callback=cb) # After training, inspect or export the history monitor.to_csv("sb3_run.csv")

Custom callback

Any callable that accepts an AlignmentSnapshot works as a callback.

def my_callback(snapshot): if snapshot.corrections_applied: my_logger.info( "step=%d corrections=%s", snapshot.step, snapshot.corrections_applied ) monitor = AutoMonitor( expected={"task": 0.7, "safety": 0.3}, callbacks=[my_callback], )

Saving and resuming a run

Long runs can be checkpointed and resumed. The saved state includes the baseline statistics, all snapshots, and the current weights — so a resumed run continues seamlessly.

# Save at the end of training (or any checkpoint) monitor.save("run_42_state.json") # Resume in a new process monitor = AutoMonitor.load("run_42_state.json") print(monitor.step_count) # picks up from where it left off print(monitor.weights) # previously learned weights print(len(monitor.snapshots)) # all historical snapshots intact
Tip

When loading a saved state you can override any constructor parameter via kwargs — for example AutoMonitor.load("state.json", auto_correct=False) to replay the saved history in read-only mode.


That covers the full reward-change logging workflow. The free Monitor gives you on-demand balance snapshots with zero overhead. The premium AutoMonitor turns every training step into a structured record — with timestamped corrections, exportable history, and direct integrations with the tooling you already use.