Neo4j Memory Systems Design Patterns Catalog¶
Research Synthesis Date: 2025-11-02 Sources: Knowledge-Archaeologist research, Memory-Manager agent, Architect agent, Integration guides Context: Design patterns for implementing Neo4j-based memory systems in AI coding agents
Executive Summary¶
This document catalogs proven design patterns for implementing Neo4j-based memory systems in AI coding agents, synthesized from research on Zep, MIRIX, blarify, and existing memory implementations. The patterns are organized by cross-cutting concerns, architectural approaches, and integration strategies.
Key Finding: Successful memory systems combine three-tier hierarchical graphs (episodic → semantic → community) with multi-modal memory types (conversation, entity, procedural, code) and hybrid retrieval (vector + graph + temporal).
Table of Contents¶
- Cross-Cutting Patterns
- Architectural Patterns
- Graph Schema Patterns
- Retrieval Patterns
- Integration Patterns
- Performance Patterns
- Agent Lifecycle Patterns
- Anti-Patterns
- Decision Framework
- Pattern Relationships
1. Cross-Cutting Patterns¶
Pattern 1.1: Three-Tier Hierarchical Graph¶
Problem: How to organize memory at different levels of abstraction for efficient retrieval.
Solution: Structure memory in three hierarchical layers:
- Episodic Layer: Raw events (conversations, commits, errors) - non-lossy storage
- Semantic Layer: Extracted entities and relationships - generalized knowledge
- Community Layer: High-level clusters and summaries - meta-organization
Implementation:
// Episodic Layer (bottom)
(ep:Episode {
timestamp: datetime(),
type: "conversation|commit|error",
content: "raw event data",
actor: "user_id"
})
// Semantic Layer (middle)
(e:Entity {
name: "function_name",
type: "Function|Class|Concept",
summary: "generalized knowledge"
})
// Community Layer (top)
(c:Community {
summary: "cluster of related entities",
entity_ids: ["e1", "e2", "e3"]
})
// Relationships connect layers
(ep:Episode)-[:MENTIONS]->(e:Entity)
(e:Entity)-[:BELONGS_TO]->(c:Community)
Trade-offs:
- ✅ Enables multi-resolution retrieval (detailed → general)
- ✅ Reduces query complexity (search at appropriate level)
- ✅ Natural consolidation path (episode → entity → community)
- ❌ Increased complexity (three layers to maintain)
- ❌ Consistency challenges (keeping layers synchronized)
- ❌ Requires periodic community recomputation
When to Use:
- Large memory stores (>10k episodes)
- Need for both detailed and high-level queries
- Systems requiring knowledge consolidation
- Multi-agent collaboration scenarios
Example from Research:
- Zep: Uses this exact pattern for episodic → semantic → community hierarchy
- MIRIX: Separates episodic from semantic memory (two-tier variation)
Pattern 1.2: Temporal Validity Tracking¶
Problem: Knowledge changes over time; old facts become invalid without being deleted.
Solution: Implement bi-temporal tracking to preserve knowledge evolution:
- Transaction time (t_created, t_expired): When we learned/forgot the fact
- Valid time (t_valid, t_invalid): When the fact was/is actually true
Implementation:
(f:Fact {
content: "User prefers dark mode",
t_valid: datetime("2025-10-01T00:00:00Z"), // When fact became true
t_invalid: datetime("2025-11-01T00:00:00Z"), // When fact became false
t_created: datetime("2025-10-02T12:00:00Z"), // When we learned it
t_expired: null, // Still in our knowledge base
invalidated_by: "fact_id_456" // Reference to superseding fact
})
// Query for currently valid facts
MATCH (f:Fact)
WHERE f.t_valid <= datetime() AND (f.t_invalid IS NULL OR f.t_invalid > datetime())
AND (f.t_expired IS NULL OR f.t_expired > datetime())
RETURN f
Trade-offs:
- ✅ Preserves knowledge history (can answer "what did we know then?")
- ✅ Handles contradictions gracefully (no data loss)
- ✅ Supports time-travel queries
- ✅ Critical for debugging ("why did we think that?")
- ❌ Increased storage overhead (never delete)
- ❌ Query complexity (temporal predicates required)
- ❌ Requires discipline (always set temporal bounds)
When to Use:
- Debugging assistance (need history of beliefs)
- Collaborative environments (conflicting knowledge)
- Learning systems (track knowledge evolution)
- Compliance requirements (audit trail)
Example from Research:
- Zep: Uses bi-temporal model for entity validity tracking
- MIRIX: Tracks update timestamps for memory freshness
Pattern 1.3: Hybrid Search (Vector + Graph + Temporal)¶
Problem: Single search modality (vector OR graph) misses important context.
Solution: Combine multiple search strategies with reciprocal rank fusion:
def hybrid_search(query, kg, top_k=10):
# Stage 1: Semantic search (vector similarity)
query_embedding = embed(query)
semantic_results = kg.vector_search(query_embedding, top_k=50)
# Stage 2: Structural search (graph traversal)
entities = extract_entities(query)
structural_results = kg.graph_query("""
MATCH (e:Entity)-[*1..2]-(related)
WHERE e.name IN $entities
RETURN related
""", entities=entities)
# Stage 3: Temporal filtering (recency boost)
recent_threshold = datetime.now() - timedelta(days=30)
temporal_results = kg.query("""
MATCH (ep:Episode)-[:MENTIONS]->(e:Entity)
WHERE ep.timestamp > $threshold
RETURN e
""", threshold=recent_threshold)
# Stage 4: Reciprocal Rank Fusion (RRF)
def rrf_score(item, rank_lists, k=60):
score = 0
for rank_list in rank_lists:
if item in rank_list:
rank = rank_list.index(item)
score += 1 / (k + rank)
return score
all_results = set(semantic_results + structural_results + temporal_results)
ranked = sorted(all_results,
key=lambda x: rrf_score(x, [semantic_results, structural_results, temporal_results]),
reverse=True)
return ranked[:top_k]
Trade-offs:
- ✅ Best retrieval accuracy (94.8% in Zep benchmarks)
- ✅ Captures multiple relevance signals
- ✅ Robust to query variations
- ❌ Higher latency (multiple queries)
- ❌ Increased complexity (multiple indices)
- ❌ Tuning required (RRF parameter k, weights)
When to Use:
- Production systems requiring high accuracy
- Queries with diverse intents (semantic + structural)
- Large knowledge bases (disambiguation needed)
- User-facing retrieval (quality matters)
Example from Research:
- Zep: Uses hybrid approach for 94.8% accuracy
- MIRIX: Combines vector embeddings with graph relationships
Pattern 1.4: Incremental Graph Updates¶
Problem: Rebuilding entire graph on file changes is too slow for interactive systems.
Solution: Update only affected nodes and relationships:
class IncrementalGraphUpdater:
def update_file(self, file_path, new_content, old_content=None):
# Parse both versions
new_ast = parse_file(file_path, new_content)
old_ast = parse_file(file_path, old_content) if old_content else None
# Extract entities from both
new_entities = extract_entities(new_ast)
old_entities = extract_entities(old_ast) if old_ast else []
# Compute diff
added = [e for e in new_entities if e not in old_entities]
removed = [e for e in old_entities if e not in new_entities]
modified = [e for e in new_entities if e in old_entities and changed(e)]
# Apply updates atomically
with self.db.transaction():
# Remove deleted entities
for entity in removed:
self.db.delete_node(entity.id)
# Add new entities
for entity in added:
self.db.create_node(entity.type, entity.properties)
# Update modified entities
for entity in modified:
self.db.update_node(entity.id, entity.properties)
# Recompute relationships only for affected entities
affected = added + modified
self.update_relationships(affected)
Trade-offs:
- ✅ Fast updates (< 1s per file vs minutes for full rebuild)
- ✅ Enables real-time memory (interactive coding)
- ✅ Lower resource usage
- ❌ Complex diff logic (entity matching)
- ❌ Risk of inconsistency (partial updates)
- ❌ Requires old state (caching or queries)
When to Use:
- Real-time coding assistants
- File watchers (auto-update on save)
- Large codebases (full rebuild too slow)
- Interactive systems
Example from Research:
- blarify: Supports incremental updates via SCIP indexing
- MIRIX: Updates only affected memory components
Pattern 1.5: Multi-Modal Memory Architecture¶
Problem: Different types of information require different storage and retrieval strategies.
Solution: Separate memory into specialized components with meta-manager:
class MultiModalMemory:
def __init__(self):
# Specialized memory stores
self.core = CoreMemory() # Persistent facts (agent + user identity)
self.episodic = EpisodicMemory() # Time-stamped events
self.semantic = SemanticMemory() # Entity relationships
self.procedural = ProceduralMemory() # How-to knowledge
self.resource = ResourceMemory() # Documents, code files
# Meta-manager routes events to appropriate stores
self.meta_manager = MetaMemoryManager()
def process_event(self, event):
# Route to appropriate memory stores
routing = self.meta_manager.route(event)
# Example: conversation → episodic + semantic
# code_change → resource + semantic + episodic
# error_resolution → procedural + episodic
for component, instructions in routing.items():
memory = getattr(self, component)
memory.update(event, instructions)
def retrieve(self, query):
# Parallel retrieval from all components
results = {
"core": self.core.retrieve(query),
"episodic": self.episodic.retrieve(query),
"semantic": self.semantic.retrieve(query),
"procedural": self.procedural.retrieve(query),
"resource": self.resource.retrieve(query)
}
# Tag by source and format for LLM
return self.format_for_llm(results)
Memory Component Details:
| Component | Purpose | Storage Duration | Query Pattern | Example |
|---|---|---|---|---|
| Core | Persistent identity | Indefinite | Direct lookup | Agent personality, user name |
| Episodic | Event log | 30-90 days | Temporal + semantic | "What error occurred yesterday?" |
| Semantic | Entity knowledge | Until invalidated | Graph traversal | "What does this function do?" |
| Procedural | Workflows | Until obsolete | Trigger matching | "How to fix ImportError?" |
| Resource | Documents | Until deleted | Full-text search | "Find auth documentation" |
Trade-offs:
- ✅ Optimized storage per memory type
- ✅ Specialized retrieval strategies
- ✅ Clear separation of concerns
- ✅ 35% improvement over RAG (MIRIX benchmarks)
- ❌ Increased system complexity (5+ components)
- ❌ Routing logic required (meta-manager)
- ❌ Cross-component queries more complex
When to Use:
- Complex agent systems (multiple knowledge types)
- Performance-critical applications (optimize per type)
- Long-running agents (diverse information)
- Production systems (proven architecture)
Example from Research:
- MIRIX: Six-component architecture (core, episodic, semantic, procedural, resource, vault)
- Zep: Separates episodic, semantic, and community layers
- Amplihack: Three-tier system (session, working, knowledge)
2. Architectural Patterns¶
Pattern 2.1: Unified Graph Model (Zep Architecture)¶
Problem: How to integrate multiple memory types into a single queryable structure.
Solution: Store all memory types in one graph with typed nodes and relationships:
┌─────────────────────────────────────────────────────────┐
│ RETRIEVAL LAYER │
│ - Semantic search (embeddings) │
│ - Graph traversal (relationships) │
│ - Temporal queries (time-based) │
│ - Hybrid reranking (multiple signals) │
└─────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────┐
│ COMMUNITY LAYER (High-level) │
│ (c:Community {summary, entity_ids, created_at}) │
│ - Clusters of related entities │
│ - High-level summaries │
└─────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────┐
│ SEMANTIC LAYER (Entity Graph) │
│ (e:Entity)-[r:RELATES_TO]->(e2:Entity) │
│ - Extracted entities │
│ - Relationships between entities │
│ - Temporal validity tracking │
└─────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────┐
│ EPISODIC LAYER (Raw Events) │
│ (ep:Episode {timestamp, content, actor}) │
│ - Conversations │
│ - Code commits │
│ - Errors and resolutions │
│ - Non-lossy storage │
└─────────────────────────────────────────────────────────┘
Implementation:
class UnifiedMemoryGraph:
def __init__(self, neo4j_driver):
self.driver = neo4j_driver
self.vector_db = VectorDatabase()
def ingest_event(self, event):
# 1. Store raw episode (non-lossy)
episode = self.create_episode(event)
# 2. Extract entities (semantic layer)
entities = self.extract_entities(event.content)
for entity in entities:
existing = self.find_or_create_entity(entity)
# Link episode to entity
self.link(episode, "MENTIONS", existing)
# 3. Extract relationships (semantic layer)
relationships = self.extract_relationships(entities)
for rel in relationships:
self.create_relationship(rel)
# 4. Update communities (incremental)
affected_communities = self.find_communities(entities)
for community in affected_communities:
self.update_community_summary(community)
def retrieve(self, query, top_k=10):
# Multi-stage retrieval
# Stage 1: Semantic search (vector similarity)
embedding = self.embed(query)
candidate_entities = self.vector_db.search(embedding, top_k=50)
# Stage 2: Graph traversal (structural)
expanded = self.graph_expand(candidate_entities, depth=2)
# Stage 3: Temporal filtering
recent = self.filter_by_recency(expanded)
# Stage 4: Episode retrieval
episodes = self.get_episodes(recent)
# Stage 5: Reranking
reranked = self.rerank(
entities=recent,
episodes=episodes,
query=query
)
return reranked[:top_k]
Trade-offs:
- ✅ Single source of truth (no synchronization issues)
- ✅ Cross-layer queries easy (graph traversal)
- ✅ Natural knowledge consolidation (bottom-up)
- ✅ Proven performance (Zep: 94.8% accuracy)
- ❌ Requires careful schema design (avoid spaghetti)
- ❌ Community computation expensive (periodic batch)
- ❌ All data in one database (scaling limits)
When to Use:
- Single-agent systems
- Medium-scale projects (10k-1M nodes)
- Need for cross-layer reasoning
- Simplicity over distribution
Example from Research:
- Zep: Production implementation with this architecture
- Achieves 94.8% retrieval accuracy
- 90% latency reduction (2.58s vs 28.9s)
Pattern 2.2: Federated Memory System (MIRIX Architecture)¶
Problem: Different memory types have different access patterns and performance requirements.
Solution: Separate databases/stores optimized per memory type, with federation layer:
class FederatedMemory:
def __init__(self):
# Separate stores optimized for different access patterns
self.core = InMemoryStore() # Fast, small, persistent
self.episodic = TimeSeriesDB() # Time-ordered, append-only
self.semantic = Neo4jGraph() # Graph queries, relationships
self.procedural = DocumentDB() # Full-text search, workflows
self.resource = ObjectStore() # Large files, S3/filesystem
# Federation layer coordinates queries
self.federation = FederationLayer()
def query(self, query_text):
# Parse query to determine relevant stores
query_plan = self.federation.plan(query_text)
# Parallel queries to relevant stores
results = {}
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(store.query, query_text): store_name
for store_name, store in query_plan.stores.items()
}
for future in as_completed(futures):
store_name = futures[future]
results[store_name] = future.result()
# Merge and rank results
return self.federation.merge(results, query_text)
Trade-offs:
- ✅ Optimized performance per store type
- ✅ Independent scaling (scale what needs it)
- ✅ Fault isolation (one store failure doesn't kill all)
- ✅ 99.9% storage reduction vs RAG (MIRIX)
- ❌ High complexity (multiple databases)
- ❌ Cross-store queries difficult
- ❌ Consistency challenges (distributed system)
- ❌ Operational overhead (manage multiple systems)
When to Use:
- Large-scale systems (>1M nodes)
- Diverse workloads (batch + interactive)
- Need for specialized optimizations
- Multi-agent architectures
Example from Research:
- MIRIX: Six separate components, meta-manager federation
- 35% improvement over RAG
- 93.3% storage reduction vs long-context
Pattern 2.3: Code-Aware Memory Graph¶
Problem: Coding assistants need both conversation memory and code structure understanding.
Solution: Integrate code graph (AST + dependencies) into memory system:
┌───────────────────────────────────────────────────────────┐
│ MEMORY RETRIEVAL ENGINE │
│ - Query understanding │
│ - Multi-modal retrieval │
│ - Context assembly │
└───────────────────────────────────────────────────────────┘
↓
┌──────────────┬──────────────┬──────────────┬─────────────┐
│ Episodic │ Semantic │ Procedural │ Code Graph │
│ Memory │ Memory │ Memory │ │
│ │ │ │ │
│ - Convos │ - Entities │ - Workflows │ - Functions │
│ - Commits │ - Relations │ - Patterns │ - Classes │
│ - Errors │ - Facts │ - Fixes │ - Deps │
└──────────────┴──────────────┴──────────────┴─────────────┘
Schema Design:
// Code entities
(f:Function {
name: "login",
signature: "def login(username: str, password: str) -> User",
file_path: "auth.py",
line_start: 45,
line_end: 67,
docstring: "Authenticates user credentials",
complexity: 8
})
(c:Class {
name: "User",
file_path: "models.py",
methods: ["__init__", "save", "delete"]
})
// Code relationships
(f1:Function)-[:CALLS {line: 52}]->(f2:Function)
(f:Function)-[:DEFINED_IN]->(file:CodeFile)
(c:Class)-[:HAS_METHOD]->(f:Function)
// Memory integration
(ep:Episode {type: "commit"})-[:MODIFIED]->(f:Function)
(ep:Episode {type: "error"})-[:OCCURRED_IN]->(f:Function)
(p:Procedure {type: "fix"})-[:APPLIES_TO]->(f:Function)
Implementation:
class CodeMemoryIntegration:
def __init__(self, codebase_path):
self.episodic = EpisodicMemory()
self.semantic = SemanticMemory()
self.procedural = ProceduralMemory()
# Integrate blarify for code parsing
self.code_graph = CodeGraph(codebase_path, use_scip=True)
def on_file_change(self, file_path, new_content):
# 1. Update code graph (incremental)
self.code_graph.update_file(file_path, new_content)
# 2. Create episode
episode = self.episodic.record({
"type": "code_change",
"file": file_path,
"timestamp": datetime.now()
})
# 3. Link episode to affected functions
affected_functions = self.code_graph.extract_entities(file_path)
for func in affected_functions:
self.link(episode, "MODIFIED", func)
def on_error(self, error):
# 1. Record episode
episode = self.episodic.record(error)
# 2. Link to code location
if error.file and error.line:
func = self.code_graph.find_function_at(error.file, error.line)
self.link(episode, "OCCURRED_IN", func)
# 3. Find applicable procedure
procedure = self.procedural.find_by_error_type(error.type)
return {
"procedure": procedure,
"similar_errors": self.episodic.find_similar(error),
"affected_code": func
}
Trade-offs:
- ✅ Deep code understanding (AST + call graph)
- ✅ Contextual memory (link errors to code)
- ✅ Pattern learning (common error locations)
- ✅ 330x faster with SCIP indexing
- ❌ Complex integration (multiple tools)
- ❌ Language-specific (parsers per language)
- ❌ Higher storage requirements
When to Use:
- AI coding assistants
- Debugging tools
- Code navigation systems
- Refactoring assistants
Example from Research:
- blarify: Code graph generation (LSP + SCIP)
- Supports Python, JavaScript, TypeScript, Ruby, Go, C#
3. Graph Schema Patterns¶
Pattern 3.1: Labeled Property Graph with Type Hierarchy¶
Problem: Need flexible schema that supports multiple entity types while enabling polymorphic queries.
Solution: Use Neo4j's labeled property graph with hierarchical node labels:
// Base entity with multiple labels (polymorphism)
CREATE (e:Entity:Function {
id: "func_001",
name: "login",
type: "Function",
signature: "def login(username: str, password: str) -> User"
})
// Query all entities
MATCH (e:Entity) RETURN e
// Query specific type
MATCH (f:Function) RETURN f
// Query by property
MATCH (e:Entity {name: "login"}) RETURN e
Label Hierarchy:
Entity (base)
├── CodeEntity
│ ├── Function
│ ├── Class
│ ├── Module
│ └── Variable
├── MemoryEntity
│ ├── Episode
│ ├── Decision
│ └── Pattern
└── MetaEntity
├── Community
└── Topic
Trade-offs:
- ✅ Flexible schema (add labels without migration)
- ✅ Polymorphic queries (query base or specific type)
- ✅ Type-specific properties
- ❌ No schema enforcement (Neo4j is schema-optional)
- ❌ Can become messy without discipline
When to Use:
- Evolving schema (frequent changes)
- Multiple entity types
- Need for polymorphic queries
Pattern 3.2: Relationship Semantics with Properties¶
Problem: Relationships need context (when, why, confidence).
Solution: Enrich relationships with properties:
// Rich relationship properties
(f1:Function)-[r:CALLS {
line: 52, // Where in code
timestamp: datetime(), // When observed
frequency: 23, // How often
confidence: 0.95, // How certain
context: "authentication flow" // Why
}]->(f2:Function)
// Temporal relationships
(e1:Entity)-[r:RELATES_TO {
t_valid: datetime("2025-10-01"),
t_invalid: datetime("2025-11-01"),
strength: 0.85
}]->(e2:Entity)
// Query with relationship properties
MATCH (f1:Function)-[r:CALLS]->(f2:Function)
WHERE r.frequency > 10
RETURN f1, r, f2
Common Relationship Properties:
- Temporal: t_valid, t_invalid, timestamp
- Provenance: source, confidence, evidence
- Context: line, file, scope
- Metrics: frequency, strength, importance
Trade-offs:
- ✅ Rich context (answer "how" and "why")
- ✅ Enables filtering (find frequent calls)
- ✅ Supports temporal queries
- ❌ Increased storage
- ❌ Query complexity (more predicates)
Pattern 3.3: Index Strategy for Performance¶
Problem: Graph queries can be slow without proper indexing.
Solution: Create strategic indexes on frequently filtered properties:
def create_indexes(driver):
"""Create indexes for optimal query performance"""
indexes = [
# Node property indexes (exact match)
"CREATE INDEX entity_name IF NOT EXISTS FOR (e:Entity) ON (e.name)",
"CREATE INDEX episode_type IF NOT EXISTS FOR (ep:Episode) ON (ep.type)",
"CREATE INDEX function_file IF NOT EXISTS FOR (f:Function) ON (f.file_path)",
# Composite indexes (multiple properties)
"CREATE INDEX entity_name_type IF NOT EXISTS FOR (e:Entity) ON (e.name, e.type)",
# Full-text indexes (text search)
"CREATE FULLTEXT INDEX entity_content IF NOT EXISTS FOR (e:Entity) ON EACH [e.name, e.summary, e.description]",
# Range indexes (temporal queries)
"CREATE INDEX episode_timestamp IF NOT EXISTS FOR (ep:Episode) ON (ep.timestamp)",
]
for index in indexes:
driver.execute_query(index)
Index Types:
| Index Type | Use Case | Example |
|---|---|---|
| B-Tree (default) | Exact match, range | WHERE e.name = 'login' |
| Composite | Multiple properties | WHERE e.name = 'login' AND e.type = 'Function' |
| Full-text | Text search | WHERE e.description CONTAINS 'authentication' |
| Vector (Enterprise) | Semantic search | WHERE vector.similarity(e.embedding, query_vec) > 0.8 |
Trade-offs:
- ✅ 10-100x query speedup
- ✅ Enables real-time queries
- ❌ Increased storage (index overhead)
- ❌ Slower writes (maintain indexes)
- ❌ Requires query analysis (know access patterns)
Best Practices:
- Index properties used in WHERE clauses
- Composite indexes for common combinations
- Full-text indexes for search
- Don't over-index (hurts writes)
4. Retrieval Patterns¶
Pattern 4.1: Multi-Stage Retrieval Pipeline¶
Problem: Single-stage retrieval misses relevant context.
Solution: Progressive refinement through multiple stages:
def multi_stage_retrieval(query, kg, top_k=10):
# Stage 1: Broad semantic search (cast wide net)
embedding = embed(query)
candidates = kg.vector_search(embedding, top_k=100)
# Stage 2: Graph expansion (find related entities)
expanded = kg.graph_query("""
MATCH (e:Entity)-[*1..2]-(related)
WHERE id(e) IN $candidate_ids
RETURN DISTINCT related
""", candidate_ids=[c.id for c in candidates])
# Stage 3: Temporal filtering (recency boost)
recent = [e for e in expanded
if e.updated_at > datetime.now() - timedelta(days=30)]
# Stage 4: Episode retrieval (get context)
episodes = kg.query("""
MATCH (ep:Episode)-[:MENTIONS]->(e:Entity)
WHERE id(e) IN $entity_ids
RETURN ep, e
ORDER BY ep.timestamp DESC
""", entity_ids=[e.id for e in recent])
# Stage 5: Reranking (combine signals)
scored = []
for entity in recent:
score = (
0.5 * semantic_score(entity, query) +
0.3 * recency_score(entity) +
0.2 * frequency_score(entity, episodes)
)
scored.append((entity, score))
return sorted(scored, key=lambda x: x[1], reverse=True)[:top_k]
Stage Purposes:
- Semantic Search: Find conceptually similar entities
- Graph Expansion: Add structurally related entities
- Temporal Filtering: Boost recent/relevant knowledge
- Episode Retrieval: Get detailed context
- Reranking: Combine multiple relevance signals
Trade-offs:
- ✅ High accuracy (captures multiple relevance types)
- ✅ Robust to query variations
- ✅ Explainable (can show why retrieved)
- ❌ Higher latency (multiple queries)
- ❌ Complex to tune (weights, thresholds)
Pattern 4.2: Contradiction Detection and Resolution¶
Problem: New information may contradict existing knowledge.
Solution: Detect contradictions and use temporal invalidation:
def handle_new_fact(new_fact, kg):
# Find potentially contradicting facts
related_facts = kg.query("""
MATCH (e1:Entity)<-[:ABOUT]-(f:Fact)-[:ABOUT]->(e2:Entity)
WHERE e1.id = $entity1 AND e2.id = $entity2
AND f.t_invalid IS NULL // Only active facts
RETURN f
""", entity1=new_fact.entity1, entity2=new_fact.entity2)
# Check for contradictions
for old_fact in related_facts:
if contradicts(new_fact, old_fact):
# Temporal invalidation (don't delete)
old_fact.t_invalid = new_fact.t_valid
old_fact.invalidated_by = new_fact.id
kg.update(old_fact)
# Log contradiction
kg.create_node("Contradiction", {
"old_fact": old_fact.id,
"new_fact": new_fact.id,
"detected_at": datetime.now(),
"resolution": "temporal_invalidation"
})
# Add new fact
kg.add(new_fact)
Contradiction Types:
- Direct: "User prefers dark mode" vs "User prefers light mode"
- Temporal: "Function deleted" vs "Function still exists"
- Logical: "A calls B" vs "A never calls B"
Resolution Strategies:
- Temporal invalidation: Mark old fact as invalid (preserve history)
- Confidence-based: Keep higher-confidence fact
- Source-based: Trust authoritative source
- User query: Ask user to resolve
Trade-offs:
- ✅ Handles changing information gracefully
- ✅ Preserves knowledge history
- ✅ Supports debugging ("why did we think that?")
- ❌ Increased complexity (contradiction detection)
- ❌ May need user intervention
Pattern 4.3: Multi-Hop Reasoning¶
Problem: Answer requires connecting multiple entities through relationships.
Solution: Iterative graph expansion with decay:
def multi_hop_reasoning(query, kg, max_hops=3):
# Extract seed entities from query
seed_entities = extract_entities(query)
# Iteratively expand
results = []
current_entities = seed_entities
visited = set()
for hop in range(max_hops):
# Find related entities
related = kg.query("""
MATCH (e:Entity)-[r]-(related:Entity)
WHERE e.id IN $entities
AND NOT related.id IN $visited
RETURN related, r, e, type(r) as rel_type
""", entities=[e.id for e in current_entities],
visited=list(visited))
# Score by relevance (decay by distance)
decay_factor = 0.7 ** hop
for rel in related:
score = (
rel.r.strength * decay_factor *
relationship_relevance(rel.rel_type, query)
)
results.append((rel.related, score, hop))
visited.add(rel.related.id)
# Update current entities for next hop
current_entities = [r.related for r in related]
# Early stopping if no new entities
if not current_entities:
break
# Rerank by combined score
return sorted(results, key=lambda x: x[1], reverse=True)
Decay Strategies:
- Distance decay: 0.7^hop (each hop reduces score)
- Relationship-based: Strong relationships decay less
- Type-based: Some relationships more relevant
Trade-offs:
- ✅ Finds indirect connections
- ✅ Answers complex queries
- ❌ Can retrieve too much (explosion)
- ❌ Requires careful tuning (max hops, decay)
5. Integration Patterns¶
Pattern 5.1: Context Injection vs. Query-Based Retrieval¶
Problem: When to inject context upfront vs. retrieve on-demand?
Two Approaches:
A. Context Injection (Eager):
class ContextInjectionAgent:
def __init__(self, memory):
self.memory = memory
# Pre-load context at agent initialization
self.context = self.memory.get_recent_context(limit=100)
def process_query(self, query):
# Context already loaded
response = self.llm(
system_prompt=self.build_system_prompt(self.context),
user_query=query
)
return response
B. Query-Based Retrieval (Lazy):
class QueryBasedAgent:
def __init__(self, memory):
self.memory = memory
def process_query(self, query):
# Retrieve context only when needed
relevant_context = self.memory.retrieve(query, top_k=10)
response = self.llm(
system_prompt=self.build_system_prompt(relevant_context),
user_query=query
)
return response
Decision Matrix:
| Factor | Context Injection | Query-Based Retrieval |
|---|---|---|
| Context size | Small (< 10k tokens) | Large (> 10k tokens) |
| Query latency | Lower (pre-loaded) | Higher (retrieval cost) |
| Context relevance | May include noise | Highly targeted |
| Memory usage | Higher (always loaded) | Lower (on-demand) |
| Use case | Chat bots, small projects | RAG, large knowledge bases |
Hybrid Approach (Best of Both):
class HybridAgent:
def __init__(self, memory):
self.memory = memory
# Pre-load critical context (core memory)
self.core_context = memory.core.get_all()
def process_query(self, query):
# Combine core + query-specific context
query_context = self.memory.retrieve(query, top_k=10)
full_context = self.core_context + query_context
response = self.llm(
system_prompt=self.build_system_prompt(full_context),
user_query=query
)
return response
Trade-offs:
- Context Injection: ✅ Low latency, ❌ May include noise
- Query-Based: ✅ High relevance, ❌ Retrieval overhead
- Hybrid: ✅ Best of both, ❌ More complex
When to Use:
- Context Injection: Small contexts, chat-based interactions
- Query-Based: Large knowledge bases, RAG systems
- Hybrid: Production systems requiring both speed and relevance
Pattern 5.2: Synchronous vs. Asynchronous Memory Operations¶
Problem: Should memory operations block agent execution or run in background?
Synchronous Pattern (Blocking):
class SyncMemoryAgent:
def process_event(self, event):
# Memory operations block execution
memory_id = self.memory.store(event)
entities = self.memory.extract_entities(event)
self.memory.update_graph(entities)
# Continue only after memory updated
return self.process_with_memory(memory_id)
Asynchronous Pattern (Non-blocking):
class AsyncMemoryAgent:
async def process_event(self, event):
# Fire-and-forget memory operations
asyncio.create_task(self.memory.store(event))
asyncio.create_task(self.memory.extract_entities(event))
# Continue immediately without waiting
return await self.process_without_blocking()
Best Practice - Write Async, Read Sync:
class HybridMemoryAgent:
async def process_event(self, event):
# Write asynchronously (fire-and-forget)
asyncio.create_task(self.memory.store(event))
# Read synchronously (need result)
context = await self.memory.retrieve(event.query)
return await self.process(context)
Decision Matrix:
| Operation | Sync/Async | Reason |
|---|---|---|
| Store episode | Async | Don't block user interaction |
| Extract entities | Async | Background processing acceptable |
| Update graph | Async | Can be eventual consistency |
| Retrieve context | Sync | Need result to continue |
| Query for decision | Sync | Decision depends on result |
Trade-offs:
- Sync: ✅ Simple, ✅ Consistent, ❌ Slower
- Async: ✅ Fast, ❌ Complex, ❌ Eventual consistency
Pattern 5.3: Agent Lifecycle Integration Points¶
Problem: When in the agent lifecycle should memory operations occur?
Lifecycle Stages:
class MemoryAwareAgent:
def __init__(self, session_id):
# 1. INITIALIZATION: Load persistent context
self.memory = get_memory_manager(session_id)
self.context = self.memory.restore_session_context()
def on_user_message(self, message):
# 2. PRE-PROCESSING: Retrieve relevant context
relevant_memories = self.memory.retrieve(message, top_k=10)
# 3. PROCESSING: Use memories in decision-making
response = self.generate_response(message, relevant_memories)
# 4. POST-PROCESSING: Store interaction
self.memory.store({
"type": "conversation",
"message": message,
"response": response,
"timestamp": datetime.now()
})
return response
def on_error(self, error):
# ERROR HANDLING: Learn from errors
self.memory.record_error(error)
procedure = self.memory.find_procedure(error)
return procedure
def on_success(self, task):
# SUCCESS HANDLING: Learn patterns
self.memory.record_success(task)
self.memory.learn_procedure(task)
def on_session_end(self):
# 5. TEARDOWN: Persist session state
self.memory.preserve_session_context(
summary=self.summarize_session(),
decisions=self.decisions_made,
tasks=self.active_tasks
)
Integration Points:
| Stage | Operations | Purpose |
|---|---|---|
| Initialization | Load context | Session continuity |
| Pre-processing | Retrieve context | Informed decisions |
| Processing | Use memories | Context-aware actions |
| Post-processing | Store results | Learn from interaction |
| Error handling | Find procedures | Error resolution |
| Success handling | Record patterns | Pattern learning |
| Teardown | Persist state | Future sessions |
Trade-offs:
- ✅ Comprehensive memory integration
- ✅ Learning at all stages
- ❌ Performance overhead at each stage
- ❌ Complexity (many integration points)
Pattern 5.4: Error Pattern Learning¶
Problem: How to learn from debugging sessions to improve future error handling.
Solution: Record error resolutions as procedures, track success rates:
class ErrorPatternLearner:
def __init__(self, memory):
self.memory = memory
def handle_error(self, error):
# 1. Check for known procedure
procedure = self.memory.procedural.find_procedure(error)
if procedure:
return {
"procedure": procedure,
"confidence": procedure.success_rate,
"times_used": procedure.times_used
}
else:
# 2. Find similar past errors
similar = self.memory.episodic.find_similar({
"type": "error",
"error_type": error.type,
"message": error.message
})
return {
"similar_cases": similar,
"confidence": 0.3 # Lower confidence (no exact procedure)
}
def record_resolution(self, error_id, steps_taken, success):
# Update episode with resolution
self.memory.episodic.update(error_id, {
"resolution_steps": steps_taken,
"success": success,
"resolved_at": datetime.now()
})
# Learn procedure if successful
if success:
error = self.memory.episodic.get(error_id)
# Check if procedure exists
procedure = self.memory.procedural.find_by_trigger(error.type)
if procedure:
# Update success rate (exponential moving average)
alpha = 0.1
procedure.success_rate = (
alpha * 1.0 +
(1 - alpha) * procedure.success_rate
)
procedure.times_used += 1
self.memory.procedural.update(procedure)
else:
# Create new procedure
self.memory.procedural.create({
"name": f"Fix {error.type}",
"trigger_pattern": error.type,
"steps": steps_taken,
"success_rate": 1.0,
"times_used": 1,
"learned_from": error_id
})
else:
# Record failure for learning
if procedure:
alpha = 0.1
procedure.success_rate = (
alpha * 0.0 +
(1 - alpha) * procedure.success_rate
)
self.memory.procedural.update(procedure)
Schema:
// Error episodes
(ep:Episode:Error {
error_type: "ImportError",
message: "Module 'requests' not found",
file: "auth.py",
line: 10,
resolution_steps: ["pip install requests", "verify PYTHONPATH"],
success: true
})
// Learned procedure
(p:Procedure {
name: "Fix ImportError",
trigger_pattern: "ImportError|ModuleNotFoundError",
steps: ["Check if installed", "Verify PYTHONPATH", "Check spelling"],
success_rate: 0.87,
times_used: 23,
avg_resolution_time: 120 // seconds
})
// Link procedure to error type
(p:Procedure)-[:FIXES]->(e:ErrorType {type: "ImportError"})
// Link to successful resolutions
(p:Procedure)-[:LEARNED_FROM]->(ep:Episode:Error {success: true})
Trade-offs:
- ✅ Improves over time (learns from experience)
- ✅ Provides proven solutions (high success rate)
- ✅ Tracks effectiveness (success_rate metric)
- ❌ Requires user feedback (was fix successful?)
- ❌ May over-fit (works for one case, not general)
6. Performance Patterns¶
Pattern 6.1: Batch Operations with UNWIND¶
Problem: Individual node/relationship creation is slow (network round-trips).
Solution: Use Cypher's UNWIND for batch operations:
def batch_create_nodes_slow(nodes):
# SLOW: Individual creates
for node in nodes:
driver.execute_query(
"CREATE (n:Entity {id: $id, name: $name})",
id=node.id, name=node.name
)
# 1000 nodes = 1000 network round-trips
def batch_create_nodes_fast(nodes):
# FAST: Single query with UNWIND
query = """
UNWIND $batch as node
CREATE (n:Entity)
SET n = node
"""
driver.execute_query(query, batch=nodes)
# 1000 nodes = 1 network round-trip
Performance Comparison:
- Individual creates: 10k nodes in ~100 seconds (py2neo)
- UNWIND batch: 10k nodes in ~0.17 seconds (LOAD CSV)
- Speedup: 588x faster
Best Practices:
- Batch size: 1000-10000 nodes per query
- Use transactions for consistency
- Create indexes before bulk load
Trade-offs:
- ✅ Massive speedup (100-500x)
- ✅ Single transaction (atomic)
- ❌ All-or-nothing (one failure fails all)
- ❌ Requires batching logic
Pattern 6.2: Query Optimization Techniques¶
Problem: Graph queries can be slow without optimization.
Solutions:
A. Use Index Hints:
// Without hint (table scan)
MATCH (e:Entity)
WHERE e.name = 'login'
RETURN e
// With hint (index seek)
MATCH (e:Entity)
USING INDEX e:Entity(name)
WHERE e.name = 'login'
RETURN e
B. Limit Traversal Depth:
// Unbounded (exponential explosion)
MATCH (f:Function)-[:CALLS*]->(called)
RETURN called
// Bounded (controlled)
MATCH (f:Function)-[:CALLS*1..3]->(called)
RETURN called
LIMIT 100
C. Use LIMIT Early:
// LIMIT at end (processes all, returns 10)
MATCH (e:Episode)
WHERE e.timestamp > datetime() - duration({days: 30})
RETURN e
ORDER BY e.timestamp DESC
LIMIT 10
// Better: Use ORDER BY + LIMIT together
MATCH (e:Episode)
WHERE e.timestamp > datetime() - duration({days: 30})
WITH e ORDER BY e.timestamp DESC LIMIT 10
RETURN e
D. Use Parameters (Never Concatenate):
# BAD: Concatenation (SQL injection, no caching)
query = f"MATCH (e:Entity {{name: '{name}'}}) RETURN e"
driver.execute_query(query)
# GOOD: Parameters (safe, cached)
query = "MATCH (e:Entity {name: $name}) RETURN e"
driver.execute_query(query, name=name)
Performance Targets:
- Simple lookups: 1-10ms
- Graph traversals (depth 2): 10-50ms
- Complex queries: 50-200ms
- If slower: Check indexes, add LIMIT, reduce depth
Pattern 6.3: Caching Strategy¶
Problem: Repeated queries waste resources.
Solution: Multi-level caching:
class CachedMemoryRetrieval:
def __init__(self, memory):
self.memory = memory
# L1: In-memory cache (fast, small)
self.l1_cache = LRUCache(maxsize=100)
# L2: Redis cache (medium, larger)
self.l2_cache = RedisCache()
# L3: Neo4j (slow, unlimited)
self.l3_database = memory
def retrieve(self, query):
# L1: Check in-memory cache
cache_key = hash_query(query)
if cache_key in self.l1_cache:
return self.l1_cache[cache_key]
# L2: Check Redis cache
result = self.l2_cache.get(cache_key)
if result:
self.l1_cache[cache_key] = result
return result
# L3: Query Neo4j
result = self.memory.retrieve(query)
# Populate caches
self.l2_cache.set(cache_key, result, ttl=3600)
self.l1_cache[cache_key] = result
return result
def invalidate(self, entity_id):
# Invalidate relevant cache entries
self.l1_cache.clear() # Simple: clear all
self.l2_cache.delete_pattern(f"*{entity_id}*")
Cache Levels:
| Level | Storage | Size | Latency | TTL | Use Case |
|---|---|---|---|---|---|
| L1 | Python dict | 100 entries | <1ms | Session | Hot queries |
| L2 | Redis | 10k entries | 1-5ms | 1 hour | Warm queries |
| L3 | Neo4j | Unlimited | 10-100ms | Permanent | Cold queries |
Invalidation Strategies:
- TTL-based: Expire after time
- Event-based: Invalidate on updates
- Manual: User-triggered cache clear
Trade-offs:
- ✅ 10-100x speedup for repeated queries
- ✅ Reduces database load
- ❌ Stale data risk (invalidation challenges)
- ❌ Increased complexity (cache management)
Pattern 6.4: Periodic Community Recomputation¶
Problem: Community detection is expensive to run on every update.
Solution: Batch recompute communities periodically:
class CommunityManager:
def __init__(self, memory):
self.memory = memory
self.last_recompute = None
self.recompute_interval = timedelta(hours=1)
def update_entity(self, entity):
# Update entity immediately
self.memory.update(entity)
# Schedule community recompute if needed
if (not self.last_recompute or
datetime.now() - self.last_recompute > self.recompute_interval):
self.schedule_recompute()
def schedule_recompute(self):
# Run in background (celery, asyncio, etc.)
asyncio.create_task(self.recompute_communities())
async def recompute_communities(self):
# Use graph algorithm (label propagation, louvain, etc.)
query = """
CALL gds.labelPropagation.stream({
nodeProjection: 'Entity',
relationshipProjection: 'RELATES_TO'
})
YIELD nodeId, communityId
MATCH (e:Entity) WHERE id(e) = nodeId
SET e.community_id = communityId
"""
await self.memory.execute_query(query)
self.last_recompute = datetime.now()
Recompute Strategies:
- Time-based: Every hour/day
- Change-based: After N updates
- Query-triggered: On-demand
- Incremental: Update only affected communities
Trade-offs:
- ✅ Avoids expensive real-time computation
- ✅ Acceptable staleness (communities don't change often)
- ❌ Eventual consistency (may see stale communities)
- ❌ Requires scheduling infrastructure
7. Agent Lifecycle Patterns¶
Pattern 7.1: Session Continuity Pattern¶
Problem: Maintain context across agent restarts.
Solution: Preserve and restore session state:
class SessionContinuityAgent:
def __init__(self, session_id):
self.session_id = session_id
self.memory = get_memory_manager(session_id)
# Restore previous session
self.restore_session()
def restore_session(self):
"""Restore session state from memory"""
context = self.memory.restore_session_context(
agent_id="orchestrator"
)
if context:
# Restore conversation history
self.conversation_history = context.get("conversation_summary", "")
# Restore decisions
self.decisions_made = context.get("key_decisions", [])
# Restore active tasks
self.active_tasks = context.get("active_tasks", [])
# Restore agent states
self.agent_states = context.get("agent_states", {})
print(f"Restored session from {context['preserved_at']}")
else:
# New session
self.conversation_history = ""
self.decisions_made = []
self.active_tasks = []
self.agent_states = {}
def on_session_end(self):
"""Preserve session state to memory"""
self.memory.preserve_session_context(
agent_id="orchestrator",
summary=self.conversation_history,
decisions=self.decisions_made,
tasks=self.active_tasks,
metadata={
"agent_states": self.agent_states,
"session_duration": self.get_session_duration(),
"message_count": len(self.conversation_history)
}
)
What to Preserve:
- Conversation summary (not full transcript)
- Key decisions made
- Active tasks/goals
- Agent collaboration state
- User preferences learned
Trade-offs:
- ✅ Seamless user experience (continuity)
- ✅ No context loss between sessions
- ❌ Storage overhead (session state)
- ❌ Privacy concerns (what to preserve?)
Pattern 7.2: Workflow State Management¶
Problem: Track multi-step workflows across agent interactions.
Solution: Store workflow state in memory with checkpoints:
class WorkflowStateManager:
def __init__(self, workflow_name, memory):
self.workflow_name = workflow_name
self.memory = memory
# Restore workflow state if exists
self.state = memory.restore_workflow_state(workflow_name)
if not self.state:
self.state = self.initialize_workflow()
def initialize_workflow(self):
"""Start new workflow"""
return {
"workflow_name": self.workflow_name,
"current_step": "init",
"completed_steps": [],
"pending_steps": [],
"step_results": {},
"started_at": datetime.now(),
"metadata": {}
}
def complete_step(self, step_name, results):
"""Mark step as complete and advance workflow"""
# Update state
self.state["completed_steps"].append(step_name)
self.state["step_results"][step_name] = results
# Determine next step
if self.state["pending_steps"]:
self.state["current_step"] = self.state["pending_steps"].pop(0)
else:
self.state["current_step"] = "completed"
# Persist to memory (checkpoint)
self.memory.preserve_workflow_state(
workflow_name=self.workflow_name,
current_step=self.state["current_step"],
completed_steps=self.state["completed_steps"],
pending_steps=self.state["pending_steps"],
step_results=self.state["step_results"],
workflow_metadata=self.state["metadata"]
)
def get_progress(self):
"""Get workflow progress"""
total = len(self.state["completed_steps"]) + len(self.state["pending_steps"]) + 1
completed = len(self.state["completed_steps"])
return {
"workflow_name": self.workflow_name,
"current_step": self.state["current_step"],
"progress_percentage": (completed / total) * 100,
"completed": self.state["completed_steps"],
"pending": self.state["pending_steps"]
}
Schema:
(w:WorkflowState {
workflow_name: "API_Development",
current_step: "implement_auth",
completed_steps: ["design_schema", "create_models"],
pending_steps: ["write_tests", "deploy"],
step_results: {
"design_schema": {"tables": 5, "relationships": 12}
},
started_at: datetime(),
updated_at: datetime()
})
// Link to related entities
(w:WorkflowState)-[:MODIFIES]->(f:File)
(w:WorkflowState)-[:INVOLVES]->(agent:Agent)
Trade-offs:
- ✅ Workflow resumption after failures
- ✅ Progress tracking
- ✅ Rollback capabilities
- ❌ Storage overhead (checkpoints)
- ❌ Complexity (state management)
Pattern 7.3: Agent Collaboration Memory¶
Problem: Multiple agents need to share context and build on each other's work.
Solution: Shared memory space with agent attribution:
class CollaborativeMemory:
def share_insight(self, from_agent, to_agent, insight):
"""Share insight between agents"""
insight_id = self.memory.store({
"agent_id": from_agent,
"title": f"Insight for {to_agent}: {insight['title']}",
"content": insight['content'],
"memory_type": MemoryType.CONTEXT,
"tags": ["collaboration", "insight", to_agent, from_agent],
"metadata": {
"recipient": to_agent,
"shared_at": datetime.now()
}
})
return insight_id
def get_insights_for_agent(self, agent_id):
"""Get insights shared with specific agent"""
return self.memory.retrieve(
tags=["collaboration", "insight", agent_id],
memory_type=MemoryType.CONTEXT
)
def record_collaboration(self, agents, collaboration_type, outcome):
"""Record collaborative work"""
collab_data = {
"participating_agents": agents,
"collaboration_type": collaboration_type,
"outcome": outcome,
"collaborated_at": datetime.now()
}
# Store for each participating agent
memory_ids = []
for agent_id in agents:
memory_id = self.memory.store({
"agent_id": agent_id,
"title": f"Collaboration: {collaboration_type}",
"content": json.dumps(collab_data),
"memory_type": MemoryType.CONTEXT,
"tags": ["collaboration", collaboration_type] + agents
})
memory_ids.append(memory_id)
return memory_ids
Schema:
// Agent insights
(insight:Insight {
from_agent: "architect",
to_agent: "builder",
title: "Use Factory Pattern",
content: "For this use case, factory pattern is more suitable...",
shared_at: datetime()
})
// Collaboration records
(collab:Collaboration {
agents: ["architect", "builder", "reviewer"],
type: "feature_development",
outcome: "Completed authentication system",
duration: 3600, // seconds
artifacts: ["auth.py", "test_auth.py"]
})
// Links
(insight:Insight)-[:FROM]->(a1:Agent {name: "architect"})
(insight:Insight)-[:TO]->(a2:Agent {name: "builder"})
(collab:Collaboration)-[:INVOLVES]->(a:Agent)
Trade-offs:
- ✅ Enables agent collaboration
- ✅ Preserves collaboration history
- ✅ Avoids duplicate work
- ❌ Complexity (coordination logic)
- ❌ Privacy concerns (cross-agent visibility)
8. Anti-Patterns¶
Anti-Pattern 8.1: String Concatenation in Queries¶
Problem: Building Cypher queries with string concatenation.
Why It's Bad:
- ⚠️ SQL injection vulnerability
- ⚠️ No query plan caching
- ⚠️ Type conversion errors
- ⚠️ Hard to maintain
Bad Example:
# DON'T DO THIS
name = "login'; DROP DATABASE; --"
query = f"MATCH (e:Entity {{name: '{name}'}}) RETURN e"
driver.execute_query(query)
Good Example:
# DO THIS
name = "login"
query = "MATCH (e:Entity {name: $name}) RETURN e"
driver.execute_query(query, name=name)
Anti-Pattern 8.2: Rebuilding Graph on Every Change¶
Problem: Regenerating entire graph on file modification.
Why It's Bad:
- ⚠️ Minutes to rebuild (unusable for interactive systems)
- ⚠️ Wastes resources (99% of graph unchanged)
- ⚠️ Loses incremental changes
Bad Example:
def on_file_change(file_path):
# DON'T: Rebuild entire codebase graph
rebuild_entire_graph(codebase_path) # Takes 5 minutes
Good Example:
def on_file_change(file_path):
# DO: Update only affected entities
update_file_entities(file_path) # Takes <1 second
Anti-Pattern 8.3: Storing Large Content in Graph¶
Problem: Storing full file contents or large documents as node properties.
Why It's Bad:
- ⚠️ Graph databases optimized for relationships, not large blobs
- ⚠️ Query performance degrades
- ⚠️ Increased memory usage
- ⚠️ Difficult to update
Bad Example:
// DON'T: Store entire file content
CREATE (f:File {
path: "auth.py",
content: "...10000 lines of code..." // Bad!
})
Good Example:
// DO: Store reference to external storage
CREATE (f:File {
path: "auth.py",
content_hash: "sha256:abc123...",
content_location: "s3://bucket/auth.py",
summary: "Authentication module with login/logout functions",
line_count: 234
})
Anti-Pattern 8.4: Ignoring Temporal Dimension¶
Problem: Deleting nodes when information becomes outdated.
Why It's Bad:
- ⚠️ Loses knowledge history
- ⚠️ Can't answer "what did we know then?"
- ⚠️ Debugging impossible (no audit trail)
- ⚠️ Can't learn from mistakes
Bad Example:
// DON'T: Delete old facts
MATCH (f:Fact {content: "User prefers dark mode"})
DELETE f
// Create new fact
CREATE (f:Fact {content: "User prefers light mode"})
Good Example:
// DO: Temporal invalidation
MATCH (f:Fact {content: "User prefers dark mode"})
SET f.t_invalid = datetime(),
f.invalidated_by = $new_fact_id
// Create new fact
CREATE (f:Fact {
content: "User prefers light mode",
t_valid: datetime()
})
Anti-Pattern 8.5: Using Deprecated Libraries¶
Problem: Using py2neo or embedded Neo4j in Python.
Why It's Bad:
- ⚠️ py2neo: No longer maintained, slow
- ⚠️ Embedded Neo4j: Deprecated, security issues
- ⚠️ Missing features (async, performance)
Bad Example:
# DON'T: Use py2neo
from py2neo import Graph
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
# DON'T: Use embedded Neo4j
from neo4j_embedded import EmbeddedGraph
db = EmbeddedGraph("/path/to/db")
Good Example:
# DO: Use official neo4j driver
from neo4j import GraphDatabase
driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password")
)
Anti-Pattern 8.6: Unbounded Graph Traversals¶
Problem: Queries without depth limits or LIMIT clauses.
Why It's Bad:
- ⚠️ Exponential explosion (can return millions of nodes)
- ⚠️ Hangs system (out of memory)
- ⚠️ Unpredictable performance
Bad Example:
// DON'T: Unbounded traversal
MATCH (f:Function)-[:CALLS*]->(called)
RETURN called // Can return entire codebase!
Good Example:
// DO: Bounded traversal with limit
MATCH (f:Function)-[:CALLS*1..3]->(called)
RETURN called
LIMIT 100
9. Decision Framework¶
When to Use Neo4j vs. Other Solutions¶
Use Neo4j When:
- ✅ Relationship queries are primary (graph traversal)
- ✅ Need ACID transactions
- ✅ Complex, multi-hop reasoning required
- ✅ Schema flexibility important (evolving model)
- ✅ Community Edition sufficient (< 10M nodes)
Consider Alternatives When:
- ❌ Pure vector search (use Pinecone, Weaviate)
- ❌ Time-series data (use InfluxDB, TimescaleDB)
- ❌ Full-text search (use Elasticsearch)
- ❌ Simple key-value (use Redis, SQLite)
- ❌ Need horizontal scaling (use Neo4j Enterprise or FalkorDB)
Architecture Selection Matrix¶
| Project Size | Memory Types | Agents | Recommended Architecture |
|---|---|---|---|
| Small (< 10k nodes) | Episodic + Semantic | Single | SQLite-based (simpler) |
| Medium (10k-1M nodes) | Episodic + Semantic + Code | Single | Unified Graph (Zep) |
| Large (> 1M nodes) | All 5 types | Multiple | Federated (MIRIX) |
| Multi-project | Episodic + Semantic | Multiple | Per-project Neo4j containers |
Performance vs. Complexity Trade-off¶
| Approach | Latency | Complexity | Scalability | Recommendation |
|---|---|---|---|---|
| In-memory only | 1ms | Low | Poor | Prototypes |
| SQLite | 10ms | Low | Medium | Small projects |
| Neo4j Community | 50ms | Medium | Good | Most projects |
| Neo4j Enterprise | 50ms | High | Excellent | Large orgs |
| Federated | 100ms | Very High | Excellent | Complex systems |
10. Pattern Relationships¶
Pattern Dependencies¶
Foundational Patterns (Start Here)
├── Three-Tier Hierarchical Graph (1.1)
├── Temporal Validity Tracking (1.2)
└── Graph Schema Patterns (Section 3)
Build Upon Foundations
├── Hybrid Search (1.3) - requires hierarchical graph
├── Multi-Modal Memory (1.5) - uses temporal tracking
└── Code-Aware Memory (2.3) - combines all above
Advanced Patterns (Last)
├── Multi-Hop Reasoning (4.3) - requires hybrid search
├── Community Recomputation (6.4) - requires hierarchical graph
└── Agent Collaboration (7.3) - requires multi-modal memory
Pattern Combinations¶
Combination 1: Production Coding Assistant
- Three-Tier Hierarchical Graph (1.1)
- Temporal Validity Tracking (1.2)
- Hybrid Search (1.3)
- Code-Aware Memory (2.3)
- Incremental Updates (1.4)
- Error Pattern Learning (5.4)
Combination 2: Multi-Agent System
- Multi-Modal Memory (1.5)
- Federated Architecture (2.2)
- Agent Collaboration Memory (7.3)
- Workflow State Management (7.2)
Combination 3: High-Performance RAG
- Unified Graph (2.1)
- Hybrid Search (1.3)
- Batch Operations (6.1)
- Caching Strategy (6.3)
Conclusion¶
Key Takeaways¶
- Three-Tier Hierarchy is the foundation (episodic → semantic → community)
- Temporal Tracking is essential for coding assistants (code changes constantly)
- Hybrid Search beats any single approach (vector + graph + temporal)
- Incremental Updates enable real-time memory (< 1s updates)
- Multi-Modal Architecture proven to work (35% improvement in MIRIX)
Implementation Roadmap¶
Phase 1: Foundation (Weeks 1-2)
- Set up Neo4j Community Edition (Docker)
- Implement three-tier hierarchy
- Add temporal validity tracking
- Create basic schema
Phase 2: Integration (Weeks 3-4)
- Integrate blarify/SCIP for code graphs
- Implement hybrid search
- Add incremental updates
- Build retrieval system
Phase 3: Advanced (Weeks 5-8)
- Add procedural memory (error learning)
- Implement agent collaboration
- Optimize performance (batching, caching)
- Add workflow state management
Phase 4: Production (Months 2-3)
- Multi-project deployment
- Monitoring and metrics
- Backup/restore system
- Cross-project learning
Resources¶
Research Papers:
- Zep: https://arxiv.org/html/2501.13956v1
- MIRIX: https://arxiv.org/html/2507.07957v1
Tools:
- Neo4j Driver: https://neo4j.com/docs/api/python-driver/current/
- blarify: https://github.com/blarApp/blarify
- SCIP: https://github.com/sourcegraph/scip
Amplihack Integration:
- Memory System:
/src/amplihack/memory/ - Integration Guide:
/.claude/tools/amplihack/memory/INTEGRATION_GUIDE.md
Document Version: 1.0 Last Updated: 2025-11-02 Maintained By: Patterns Agent + Knowledge-Archaeologist