Memory System Integration: Concrete Code Examples¶
Overview¶
This document provides concrete Python implementation examples for integrating a memory system into Claude Code agents. All examples follow the "minimal integration" principle: maximum value with minimum changes.
Part 1: Core Memory System Components¶
1.1 Memory Store (Simple JSON-Based)¶
File: .claude/memory/system/memory_store.py
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
class MemoryStore:
"""Simple JSON-based memory storage system."""
def __init__(self, base_path: str = ".claude/runtime/memory"):
"""Initialize memory store with base directory."""
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
# Create subdirectories
(self.base_path / "agents").mkdir(exist_ok=True)
(self.base_path / "workflows").mkdir(exist_ok=True)
(self.base_path / "errors").mkdir(exist_ok=True)
(self.base_path / "users").mkdir(exist_ok=True)
(self.base_path / "domains").mkdir(exist_ok=True)
def _get_file_path(self, category: str, name: str) -> Path:
"""Get full file path for a memory item."""
if category not in ["agents", "workflows", "errors", "users", "domains"]:
raise ValueError(f"Invalid category: {category}")
return self.base_path / category / f"{name}.json"
def store(self, category: str, name: str, data: Dict[str, Any]) -> None:
"""Store a memory item."""
file_path = self._get_file_path(category, name)
# Load existing data if file exists
existing_data = []
if file_path.exists():
try:
existing_data = json.loads(file_path.read_text())
if not isinstance(existing_data, list):
existing_data = [existing_data]
except json.JSONDecodeError:
existing_data = []
# Add timestamp if not present
if "timestamp" not in data:
data["timestamp"] = datetime.now().isoformat()
# Append new data (store history)
existing_data.append(data)
# Write back
file_path.write_text(json.dumps(existing_data, indent=2))
def retrieve(self, category: str, name: str) -> Optional[List[Dict[str, Any]]]:
"""Retrieve all memory items for a category/name."""
file_path = self._get_file_path(category, name)
if not file_path.exists():
return None
try:
data = json.loads(file_path.read_text())
return data if isinstance(data, list) else [data]
except json.JSONDecodeError:
return None
def update(self, category: str, name: str, data: Dict[str, Any]) -> None:
"""Update/replace all memory items for a category/name."""
file_path = self._get_file_path(category, name)
if "timestamp" not in data:
data["timestamp"] = datetime.now().isoformat()
file_path.write_text(json.dumps([data], indent=2))
def get_all(self, category: str) -> Dict[str, Any]:
"""Get all memory items in a category."""
category_path = self.base_path / category
if not category_path.exists():
return {}
result = {}
for file_path in category_path.glob("*.json"):
name = file_path.stem
try:
result[name] = json.loads(file_path.read_text())
except json.JSONDecodeError:
pass
return result
def clear(self, category: str, name: str) -> None:
"""Clear memory items."""
file_path = self._get_file_path(category, name)
if file_path.exists():
file_path.unlink()
1.2 Memory Retrieval Interface¶
File: .claude/memory/system/memory_retrieval.py
from typing import Dict, List, Any, Optional
from .memory_store import MemoryStore
class MemoryRetrieval:
"""Query interface for memory system."""
def __init__(self, store: MemoryStore):
"""Initialize with memory store."""
self.store = store
def query_pre_execution(
self,
agent_name: str,
task_category: Optional[str] = None,
user_domain: Optional[str] = None
) -> str:
"""
Query memory for context to inject before agent execution.
Returns formatted markdown string for prompt injection.
"""
context_parts = []
# 1. Similar past agent decisions
agent_history = self.store.retrieve("agents", agent_name)
if agent_history:
similar_tasks = [
item for item in agent_history
if task_category is None or item.get("task_category") == task_category
]
if similar_tasks:
# Get top 3 most recent successful tasks
successful = [t for t in similar_tasks if t.get("success", False)]
top_tasks = sorted(
successful,
key=lambda x: x.get("timestamp", ""),
reverse=True
)[:3]
if top_tasks:
context_parts.append("## Memory: Similar Past Tasks")
context_parts.append(f"Found {len(top_tasks)} successful similar tasks:")
for task in top_tasks:
context_parts.append(
f"- **{task.get('decision', 'Unknown')}**: "
f"{task.get('reasoning', '')} "
f"(Quality: {task.get('outcome_quality', 0)}/10)"
)
context_parts.append("")
# 2. Domain-specific context
if user_domain:
domain_context = self.store.retrieve("domains", user_domain)
if domain_context:
context_parts.append("## Memory: Domain Context")
for item in domain_context:
if "patterns" in item:
context_parts.append("**Recommended Patterns:**")
for pattern in item["patterns"]:
context_parts.append(f"- {pattern}")
if "common_issues" in item:
context_parts.append("**Watch Out For:**")
for issue in item["common_issues"]:
context_parts.append(f"- {issue}")
context_parts.append("")
# 3. Common errors for this agent
error_history = self.store.retrieve("errors", agent_name)
if error_history and len(error_history) > 3:
context_parts.append("## Memory: Common Error Patterns")
errors = sorted(
error_history,
key=lambda x: x.get("frequency", 0),
reverse=True
)[:3]
for error in errors:
context_parts.append(
f"- **{error.get('error_type', 'Unknown')}**: "
f"{error.get('prevention_tip', '')}"
)
context_parts.append("")
return "\n".join(context_parts) if context_parts else ""
def get_agent_stats(self, agent_name: str) -> Dict[str, Any]:
"""Get statistics for an agent."""
history = self.store.retrieve("agents", agent_name)
if not history:
return {}
successful = [h for h in history if h.get("success", False)]
return {
"total_executions": len(history),
"successful": len(successful),
"success_rate": len(successful) / len(history) if history else 0,
"avg_execution_time": (
sum(h.get("execution_time", 0) for h in history) / len(history)
if history else 0
),
"avg_quality": (
sum(h.get("outcome_quality", 0) for h in history) / len(history)
if history else 0
)
}
def get_workflow_stats(self, workflow_name: str, step_number: int) -> Dict[str, Any]:
"""Get statistics for a workflow step."""
history = self.store.retrieve("workflows", f"{workflow_name}_step_{step_number}")
if not history:
return {}
successful = [h for h in history if h.get("success", False)]
return {
"total_executions": len(history),
"successful": len(successful),
"success_rate": len(successful) / len(history) if history else 0,
"avg_duration": (
sum(h.get("duration", 0) for h in history) / len(history)
if history else 0
),
"common_blockers": self._extract_blockers(history)
}
def query_error_pattern(self, error_type: str, context: Optional[str] = None) -> Dict[str, Any]:
"""Query solutions for an error type."""
history = self.store.retrieve("errors", error_type)
if not history:
return {}
# Get solutions that worked
solutions = [
h for h in history
if h.get("solution_worked", False)
]
return {
"previous_occurrences": len(history),
"solutions_that_worked": len(solutions),
"success_rate": len(solutions) / len(history) if history else 0,
"top_solutions": solutions[-3:], # Most recent successful solutions
"prevention_tips": list(set(
h.get("prevention_tip", "")
for h in history
if h.get("prevention_tip")
))
}
@staticmethod
def _extract_blockers(history: List[Dict[str, Any]]) -> List[str]:
"""Extract common blockers from history."""
blockers = {}
for item in history:
if not item.get("success", False):
blocker = item.get("blocker", "Unknown")
blockers[blocker] = blockers.get(blocker, 0) + 1
# Return top 3 blockers
return [
blocker for blocker, count in
sorted(blockers.items(), key=lambda x: x[1], reverse=True)[:3]
]
Part 2: Integration Hooks¶
2.1 Pre-Execution Hook (Agent Invocation Enhancement)¶
Location: Agent orchestration layer (where agents are invoked)
from .claude.memory.system.memory_store import MemoryStore
from .claude.memory.system.memory_retrieval import MemoryRetrieval
class AgentOrchestrator:
"""Enhanced orchestrator with memory support."""
def __init__(self):
self.memory_store = MemoryStore()
self.memory_retrieval = MemoryRetrieval(self.memory_store)
def invoke_agent(
self,
agent_name: str,
task_prompt: str,
task_category: str = "general",
user_domain: str = "general"
) -> str:
"""
Invoke agent with memory enhancement.
MINIMAL CHANGE: Only 3 new lines for memory injection.
"""
# Load agent definition (existing code)
agent_def = self.load_agent_definition(agent_name)
# NEW: Query memory for context (3 lines)
memory_context = self.memory_retrieval.query_pre_execution(
agent_name=agent_name,
task_category=task_category,
user_domain=user_domain
)
# NEW: Augment prompt with memory context
augmented_prompt = f"""{memory_context}
## Task
{task_prompt}"""
# Execute agent (existing code)
response = self.send_to_claude(agent_def, augmented_prompt)
# NEW: Record decision (existing code path enhanced)
self.record_decision(agent_name, response, task_category)
return response
def record_decision(
self,
agent_name: str,
response: str,
task_category: str,
quality_score: float = 0.0,
execution_time: float = 0.0
) -> None:
"""
Record decision to memory.
MINIMAL CHANGE: Extract metadata and store.
"""
# NEW: Extract decision metadata
decision_data = {
"agent": agent_name,
"task_category": task_category,
"decision": response[:500], # First 500 chars as summary
"outcome_quality": quality_score,
"execution_time": execution_time,
"success": quality_score > 0.5, # Simple success metric
}
# NEW: Store to memory
self.memory_store.store("agents", agent_name, decision_data)
def load_agent_definition(self, agent_name: str) -> str:
"""Existing code - no changes."""
# Implementation exists
pass
def send_to_claude(self, agent_def: str, prompt: str) -> str:
"""Existing code - no changes."""
# Implementation exists
pass
2.2 Post-Execution Hook (Decision Recording)¶
Location: Decision logging (after DECISIONS.md is written)
from pathlib import Path
from .claude.memory.system.memory_store import MemoryStore
class DecisionRecorder:
"""Records decisions to memory after DECISIONS.md creation."""
def __init__(self):
self.memory_store = MemoryStore()
def record_workflow_step(
self,
workflow_name: str,
step_number: int,
agents_used: List[str],
success: bool,
duration: float,
blockers: Optional[List[str]] = None
) -> None:
"""
Record workflow step execution to memory.
Called after workflow step completes and DECISIONS.md updated.
"""
step_key = f"{workflow_name}_step_{step_number}"
step_data = {
"workflow": workflow_name,
"step": step_number,
"agents": agents_used,
"success": success,
"duration": duration,
"blockers": blockers or []
}
self.memory_store.store("workflows", step_key, step_data)
def record_error_fix(
self,
error_type: str,
error_message: str,
solution: str,
solution_worked: bool,
prevention_tip: str = ""
) -> None:
"""
Record error fix to memory.
Called when an error is encountered and fixed.
"""
error_data = {
"error_type": error_type,
"error_message": error_message,
"solution": solution,
"solution_worked": solution_worked,
"prevention_tip": prevention_tip
}
self.memory_store.store("errors", error_type, error_data)
2.3 Workflow Orchestration Hook (UltraThink)¶
Location: Workflow execution loop
class UltraThinkOrchestrator:
"""Enhanced UltraThink with memory-aware workflow execution."""
def __init__(self):
self.memory_retrieval = MemoryRetrieval(MemoryStore())
def execute_workflow_step(
self,
workflow_name: str,
step_number: int,
agents_to_use: List[str]
) -> Dict[str, Any]:
"""
Execute workflow step with memory-enhanced decision making.
NEW: Query memory to potentially adapt agent usage.
"""
# NEW: Query workflow history
step_stats = self.memory_retrieval.get_workflow_stats(
workflow_name=workflow_name,
step_number=step_number
)
# NEW: Adapt execution based on history
if step_stats.get("success_rate", 1.0) < 0.7:
# Low success rate - add extra validation
print(f"⚠️ Step {step_number} has {step_stats['success_rate']:.0%} success rate")
print(f"⚠️ Common blockers: {', '.join(step_stats.get('common_blockers', []))}")
# Optionally add extra agent
if "security" not in agents_to_use:
print("ℹ️ Adding security review for extra validation")
agents_to_use = agents_to_use + ["security"]
# Execute with agents (existing code)
results = {}
for agent in agents_to_use:
results[agent] = self.execute_agent(agent, step_number)
return results
def execute_agent(self, agent_name: str, step_number: int) -> Dict[str, Any]:
"""Existing code - no changes."""
# Implementation exists
pass
2.4 Error Pattern Hook¶
Location: Error handler / fix-agent invocation
class ErrorHandler:
"""Enhanced error handling with memory patterns."""
def __init__(self):
self.memory_retrieval = MemoryRetrieval(MemoryStore())
self.memory_store = MemoryStore()
def handle_error(self, error: Exception) -> Optional[str]:
"""
Handle error with memory-enhanced solutions.
NEW: Query error patterns before invoking fix-agent.
"""
error_type = type(error).__name__
error_message = str(error)
# NEW: Query memory for similar errors
error_record = self.memory_retrieval.query_error_pattern(
error_type=error_type,
context=error_message
)
if error_record and error_record.get("success_rate", 0) > 0.7:
# We've fixed this before - use memory solutions
print(f"✓ Found {error_record['previous_occurrences']} previous occurrences")
print(f"✓ Success rate: {error_record['success_rate']:.0%}")
top_solution = error_record.get("top_solutions", [{}])[0]
print(f"✓ Recommended solution: {top_solution.get('solution', 'Unknown')}")
if error_record.get("prevention_tips"):
print(f"✓ Prevention tips:")
for tip in error_record["prevention_tips"][:2]:
print(f" - {tip}")
# Provide memory context to fix-agent
fix_context = self._format_error_memory(error_record)
return fix_context
# No memory - proceed with standard fix
return None
def _format_error_memory(self, error_record: Dict[str, Any]) -> str:
"""Format error record for fix-agent prompt injection."""
lines = [
"## Error Pattern Memory",
f"- This error has occurred {error_record['previous_occurrences']} times",
f"- Success rate: {error_record['success_rate']:.0%}",
""
]
solutions = error_record.get("top_solutions", [])
if solutions:
lines.append("### Previous Solutions:")
for i, sol in enumerate(solutions, 1):
lines.append(f"{i}. {sol.get('solution', '')}")
lines.append("")
tips = error_record.get("prevention_tips", [])
if tips:
lines.append("### Prevention Tips:")
for tip in tips:
lines.append(f"- {tip}")
return "\n".join(lines)
Part 3: Memory System Usage Examples¶
3.1 Example: Architect Agent with Memory¶
def example_architect_with_memory():
"""Example of architect agent receiving memory context."""
orchestrator = AgentOrchestrator()
# Memory system automatically provides context
architect_response = orchestrator.invoke_agent(
agent_name="architect",
task_prompt="Design authentication system for microservices",
task_category="system_design",
user_domain="web_services"
)
# Memory context is injected before agent sees the prompt:
# - Similar past auth system designs (3 examples)
# - Domain-specific patterns for web services
# - Common authentication errors from past (race conditions, etc.)
# All of this is added to the prompt automatically
return architect_response
3.2 Example: Error Fixing with Memory¶
def example_error_with_memory():
"""Example of error handling with memory."""
handler = ErrorHandler()
try:
# Some operation that fails
import_module("non_existent_module")
except ModuleNotFoundError as e:
# Query memory for solutions
fix_context = handler.handle_error(e)
# Output might be:
# ✓ Found 7 previous occurrences
# ✓ Success rate: 100%
# ✓ Recommended solution: Add to requirements.txt
# ✓ Prevention tips:
# - Always check imports before running
# - Run in virtual environment
if fix_context:
print(fix_context)
3.3 Example: Workflow with Memory¶
def example_workflow_with_memory():
"""Example of workflow execution with memory."""
orchestrator = UltraThinkOrchestrator()
# Execute workflow step with memory enhancement
step_results = orchestrator.execute_workflow_step(
workflow_name="DEFAULT_WORKFLOW",
step_number=4, # Architecture step
agents_to_use=["architect"]
)
# Memory system checks:
# - Step 4 success rate: 85% (good)
# - No extra agents needed
# vs.
# - Step 4 success rate: 40% (poor)
# - Common blockers: missing API specs
# → Automatically add api-designer agent
Part 4: Data Structures¶
Agent Decision Record¶
{
"agent": "architect",
"task_category": "system_design",
"decision": "Use token-based authentication with JWT...",
"reasoning": "Better scalability for distributed systems",
"outcome_quality": 9.5,
"execution_time": 180.5,
"success": true,
"timestamp": "2025-11-02T10:30:00Z"
}
Workflow Step Record¶
{
"workflow": "DEFAULT_WORKFLOW",
"step": 4,
"agents": ["architect", "api-designer"],
"success": true,
"duration": 520,
"blockers": [],
"timestamp": "2025-11-02T10:30:00Z"
}
Error Pattern Record¶
{
"error_type": "ModuleNotFoundError",
"error_message": "No module named 'xyz'",
"solution": "Add 'xyz' to requirements.txt",
"solution_worked": true,
"prevention_tip": "Always verify imports in virtual environment",
"timestamp": "2025-11-02T10:30:00Z"
}
Part 5: Testing the Integration¶
Unit Test Example¶
def test_memory_pre_execution():
"""Test that memory context is injected correctly."""
retrieval = MemoryRetrieval(MemoryStore())
store = retrieval.store
# Store a past decision
store.store("agents", "architect", {
"decision": "Use token-based auth",
"reasoning": "Scalable",
"outcome_quality": 9.5,
"task_category": "authentication",
"success": True
})
# Query for context
context = retrieval.query_pre_execution(
agent_name="architect",
task_category="authentication"
)
# Verify context is returned
assert "Similar Past Tasks" in context
assert "Use token-based auth" in context
assert "9.5/10" in context
def test_memory_post_execution():
"""Test that decisions are stored correctly."""
store = MemoryStore()
# Record a decision
store.store("agents", "architect", {
"decision": "Test decision",
"outcome_quality": 8.0,
"success": True
})
# Retrieve decision
decisions = store.retrieve("agents", "architect")
assert decisions is not None
assert len(decisions) == 1
assert decisions[0]["decision"] == "Test decision"
def test_error_pattern_query():
"""Test error pattern retrieval."""
retrieval = MemoryRetrieval(MemoryStore())
store = retrieval.store
# Store error records
store.store("errors", "ModuleNotFoundError", {
"error_type": "ModuleNotFoundError",
"solution": "Add to requirements.txt",
"solution_worked": True
})
# Query error pattern
pattern = retrieval.query_error_pattern("ModuleNotFoundError")
assert pattern["previous_occurrences"] >= 1
assert pattern["success_rate"] > 0
Summary: What These Examples Show¶
- MemoryStore: Simple JSON-based persistence
- MemoryRetrieval: Query interface for agents
- Integration Hooks: Where to add 3-5 lines of code
- Minimal Changes: No modifications to agent definitions
- Real Usage: Concrete examples of memory enhancing agents
- Data Structures: What gets stored and how
Key Insight: Total implementation is ~500 lines of code (including comments), integrated with only 5-10 line changes in existing code.