Power Steering Technical Reference¶
Ahoy, developer! This be the technical reference fer power steering's internals, particularly the infinite loop fix implemented in v0.9.1.
State Management Architecture¶
State Structure¶
Power steering maintains persistent state in JSON format:
{
"consecutive_blocks": 0, # Guidance counter
"session_id": "20251217_193000", # Session identifier
"last_check_timestamp": "2025-12-17T19:30:00Z",
"check_results": { # Last check results
"files_modified": ["file1.py", "file2.py"],
"workflow_compliant": true,
"quality_score": 85
}
}
State file location: .claude/runtime/power-steering/{session_id}/state.json
State Lifecycle¶
Atomic Write Implementation¶
Core Write Function¶
def _atomic_write_state(self, state_data: Dict[str, Any]) -> bool:
"""
Atomically write state with verification.
Returns True if write AND verification succeed.
Logs to diagnostics on failure.
"""
state_file = self._get_state_file()
# Phase 1: Write with fsync
try:
with open(state_file, 'w') as f:
json.dump(state_data, f, indent=2)
f.flush()
os.fsync(f.fileno()) # CRITICAL: Force disk write
except OSError as e:
self._log_diagnostic({
"operation": "state_save",
"save_success": False,
"error": str(e)
})
return False
# Phase 2: Verification read
try:
with open(state_file, 'r') as f:
verified_data = json.load(f)
if verified_data != state_data:
self._log_diagnostic({
"operation": "verification_read",
"verification_success": False,
"reason": "data_mismatch"
})
return False
except (OSError, json.JSONDecodeError) as e:
self._log_diagnostic({
"operation": "verification_read",
"verification_success": False,
"error": str(e)
})
return False
# Success
self._log_diagnostic({
"operation": "state_save",
"save_success": True,
"verification_success": True,
"retry_count": 0
})
return True
Retry Logic¶
def _write_with_retry(self, state_data: Dict[str, Any],
max_retries: int = 3) -> bool:
"""Write with exponential backoff for cloud sync resilience"""
retry_delays = [0.1, 0.2, 0.4] # seconds
for attempt, delay in enumerate(retry_delays):
if self._atomic_write_state(state_data):
if attempt > 0:
self._log_diagnostic({
"operation": "state_save_retry",
"success": True,
"retry_count": attempt
})
return True
if attempt < len(retry_delays) - 1:
time.sleep(delay)
# All retries failed - try non-atomic fallback
return self._fallback_write(state_data)
Why fsync() is Critical¶
Without fsync():
with open(file, 'w') as f:
f.write(data)
# Data in OS buffer - not on disk yet!
# Power loss = data loss
# Cloud sync may read old data
With fsync():
with open(file, 'w') as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
# Data GUARANTEED on disk
# Cloud sync sees latest data
Trade-off:
- Adds 1-2ms latency per write
- Worth it to prevent infinite loops
Defensive Validation¶
Validation Rules¶
def _validate_state(self, state: Dict) -> Tuple[bool, str]:
"""
Validate loaded state data.
Returns (is_valid, reason) tuple.
"""
# Type check
if not isinstance(state, dict):
return (False, "state_not_dict")
# Counter validation
counter = state.get("consecutive_blocks")
if counter is None:
return (False, "missing_counter")
if not isinstance(counter, int):
return (False, "counter_not_int")
if counter < 0:
return (False, "negative_counter")
if counter > 1000: # Sanity check
return (False, "counter_too_large")
# Session ID validation
session_id = state.get("session_id")
if not isinstance(session_id, str) or not session_id:
return (False, "invalid_session_id")
return (True, "")
Recovery Strategy¶
def _load_state_with_validation(self) -> Dict[str, Any]:
"""Load state with validation and recovery"""
try:
with open(self._get_state_file(), 'r') as f:
state = json.load(f)
is_valid, reason = self._validate_state(state)
if not is_valid:
self._log_diagnostic({
"operation": "validation",
"validation_failed": True,
"reason": reason,
"corrupted_state": state
})
# Recovery: Reset to defaults
state = self._get_default_state()
self._log_diagnostic({
"operation": "state_reset",
"counter_reset_to": 0
})
except (FileNotFoundError, json.JSONDecodeError):
# Expected on first run or corrupted file
state = self._get_default_state()
return state
Diagnostic Logging¶
Log Format¶
Structured JSON logs written to .claude/runtime/power-steering/{session_id}/diagnostic.jsonl:
{"timestamp": "2025-12-17T19:30:00.123Z", "operation": "state_save", ...}
{"timestamp": "2025-12-17T19:30:00.456Z", "operation": "state_load", ...}
{"timestamp": "2025-12-17T19:30:01.789Z", "operation": "validation", ...}
Log Entry Types¶
state_save:
{
"timestamp": "ISO8601",
"operation": "state_save",
"counter_before": 0,
"counter_after": 1,
"session_id": "session_id_string",
"file_path": "path/to/state.json",
"save_success": true,
"verification_success": true,
"retry_count": 0
}
state_load:
{
"timestamp": "ISO8601",
"operation": "state_load",
"session_id": "session_id_string",
"file_path": "path/to/state.json",
"load_success": true,
"validation_passed": true,
"counter_value": 1
}
validation:
{
"timestamp": "ISO8601",
"operation": "validation",
"validation_failed": true,
"reason": "negative_counter",
"corrupted_state": { "consecutive_blocks": -1 }
}
state_reset:
{
"timestamp": "ISO8601",
"operation": "state_reset",
"reason": "corruption_detected",
"counter_reset_to": 0
}
Log Analysis¶
Extract failures:
cat .claude/runtime/power-steering/*/diagnostic.jsonl | \
grep '"success": false' | \
jq -r '[.timestamp, .operation, .reason] | @tsv'
Count retries:
cat .claude/runtime/power-steering/*/diagnostic.jsonl | \
grep '"retry_count"' | \
jq '.retry_count' | \
awk '{sum+=$1; count++} END {print sum/count}'
Find corruption events:
Message Customization¶
Check Results Integration¶
def _customize_message(self,
check_results: Dict[str, Any]) -> str:
"""
Customize guidance message based on check results.
Args:
check_results: Results from power_steering_checker
- files_modified: List of changed files
- workflow_compliant: Boolean
- quality_score: 0-100
Returns:
Customized guidance string
"""
# Base message
if not check_results.get("files_modified"):
return "Ahoy! No files modified - smooth sailin'!"
# Workflow compliance check
if not check_results.get("workflow_compliant"):
return (
f"Arr! Found {len(check_results['files_modified'])} "
"modified files, but workflow not followed. "
"Check DEFAULT_WORKFLOW.md, matey!"
)
# Quality score check
quality = check_results.get("quality_score", 0)
if quality < 70:
return (
f"Files modified with quality score {quality}. "
"Consider reviewin' yer changes fer improvement."
)
# All good
return (
f"Nice work! {len(check_results['files_modified'])} "
f"files modified with {quality}% quality. "
"Keep it up!"
)
Performance Characteristics¶
Operation Latencies¶
| Operation | Typical | Worst Case | Notes |
|---|---|---|---|
| State load | 0.5ms | 5ms | Depends on disk speed |
| State save (no fsync) | 0.5ms | 5ms | OS buffered |
| State save (with fsync) | 1-2ms | 50ms | Disk flush |
| Verification read | 0.3ms | 3ms | Cached by OS |
| Validation | 0.1ms | 0.5ms | Pure CPU |
| Retry delay | 100ms | 400ms | Cloud sync tolerance |
Memory Usage¶
- State object: ~500 bytes
- Diagnostic log: ~200 bytes per entry
- Session overhead: ~1KB
Scalability¶
- State files isolated per session (no contention)
- Diagnostic logs auto-rotate (TODO: implement cleanup)
- No database dependencies (filesystem only)
Error Codes¶
Power steering uses these return codes:
| Code | Meaning | Recovery |
|---|---|---|
| 0 | Success | Continue |
| 1 | State save failed | Retry, then fallback |
| 2 | Validation failed | Reset to defaults |
| 3 | Verification failed | Retry write |
| 4 | All retries exhausted | Log warning, continue |
Testing Strategy¶
Unit Tests¶
Test individual components in isolation:
def test_atomic_write_with_fsync():
"""Verify fsync is called during write"""
with patch('os.fsync') as mock_fsync:
state_manager._atomic_write_state({})
assert mock_fsync.called
def test_validation_detects_negative_counter():
"""Validation catches corrupted counter"""
state = {"consecutive_blocks": -1}
is_valid, reason = validator._validate_state(state)
assert not is_valid
assert reason == "negative_counter"
Integration Tests¶
Test component interactions:
def test_save_load_roundtrip():
"""Data persists correctly through save/load"""
original_state = {"consecutive_blocks": 5}
manager.save_state(original_state)
loaded_state = manager.load_state()
assert loaded_state == original_state
def test_corruption_recovery():
"""System recovers from corrupted state file"""
# Write corrupted data
write_corrupted_state_file()
# Load should recover
state = manager.load_state()
assert state["consecutive_blocks"] == 0 # Default
End-to-End Tests¶
Simulate real-world scenarios:
def test_cloud_sync_conflict():
"""Handle cloud sync delays gracefully"""
with simulated_cloud_sync_delay(200): # ms
manager.save_state({"consecutive_blocks": 1})
state = manager.load_state()
assert state["consecutive_blocks"] == 1
def test_concurrent_session_isolation():
"""Multiple sessions don't interfere"""
session1 = PowerSteeringState("session1")
session2 = PowerSteeringState("session2")
session1.save_state({"consecutive_blocks": 5})
session2.save_state({"consecutive_blocks": 10})
assert session1.load_state()["consecutive_blocks"] == 5
assert session2.load_state()["consecutive_blocks"] == 10
Configuration¶
Environment Variables¶
Control power steering behavior:
# Enable debug logging
export AMPLIHACK_PS_DEBUG=1
# Increase retry count
export AMPLIHACK_PS_MAX_RETRIES=5
# Disable fsync (NOT RECOMMENDED)
export AMPLIHACK_PS_NO_FSYNC=1
# Custom state directory
export AMPLIHACK_PS_STATE_DIR=/custom/path
Runtime Configuration¶
Override defaults in code:
from claude.tools.amplihack.hooks import power_steering_state
# Custom retry delays (seconds)
power_steering_state.RETRY_DELAYS = [0.05, 0.1, 0.2]
# Disable verification read (NOT RECOMMENDED)
power_steering_state.VERIFY_WRITES = False
# Custom diagnostic log location
power_steering_state.DIAGNOSTIC_LOG_DIR = "/custom/logs"
Debugging¶
Enable Debug Logging¶
Trace State Operations¶
# Watch state file changes in real-time
watch -n 1 cat .claude/runtime/power-steering/*/state.json
# Tail diagnostic logs
tail -f .claude/runtime/power-steering/*/diagnostic.jsonl | jq
Reproduce Issues¶
# Simulate fsync failure
with patch('os.fsync', side_effect=OSError("Disk full")):
manager.save_state(state)
# Simulate corrupted file
write_invalid_json_to_state_file()
manager.load_state() # Should recover gracefully