ContentSource Architecture¶
Understanding how WikiGR achieves source-agnostic knowledge graph construction.
The Problem¶
Building knowledge graphs from different content sources (Wikipedia, web pages, local files) requires handling:
- Different data formats (API responses, HTML, markdown)
- Different link structures (wiki links vs URLs vs file paths)
- Different metadata (page IDs vs URLs vs file paths)
Traditional approaches tightly couple extraction logic to each source, leading to code duplication and inconsistent extraction quality.
The Solution: ContentSource Protocol¶
WikiGR uses a protocol-based architecture that separates content acquisition from content processing.
ContentSource Protocol¶
from typing import Protocol, Generator
class ContentSource(Protocol):
"""
Protocol for content sources.
Any object implementing get_articles() can be used as a content source.
"""
def get_articles(self) -> Generator[Article, None, None]:
"""Yields Article objects from the source."""
...
Key insight: All content sources produce the same output type (Article), regardless of input format.
Article: The Universal Data Model¶
@dataclass
class Article:
title: str # Article title
content: str # Main text content
url: str # Unique identifier
links: List[str] # Links for expansion
An Article is source-agnostic:
- Wikipedia:
titlefrom API,contentfrom wikitext,url= Wikipedia URL - Web:
titlefrom<title>,contentfrom HTML parsing,url= web URL - Local files:
titlefrom filename,contentfrom file,url= file path
Shared ArticleProcessor¶
All sources use the same processor:
processor = ArticleProcessor(conn, use_llm=True)
for article in source.get_articles():
processor.process_article(
title=article.title,
content=article.content,
url=article.url
)
Result: Identical extraction quality across all sources.
Architecture Diagram¶
┌─────────────────┐
│ WikiGR CLI │
│ (wikigr create) │
└────────┬────────┘
│
├─────────────────┐
│ │
┌────────▼─────────┐ ┌───▼──────────────┐
│ Wikipedia Source │ │ Web Source │
│ (API calls) │ │ (HTTP requests) │
└────────┬─────────┘ └───┬──────────────┘
│ │
│ Article │ Article
│ objects │ objects
│ │
└────────┬───────┘
│
┌────────▼────────────┐
│ ArticleProcessor │
│ (shared extraction) │
└────────┬────────────┘
│
┌────────▼────────────┐
│ LadybugDB Database │
│ (knowledge graph) │
└─────────────────────┘
Design Principles¶
1. Separation of Concerns¶
ContentSource responsibilities: - Fetch content from external source - Parse into Article objects - Handle source-specific errors (network, API limits)
ArticleProcessor responsibilities: - Extract entities from articles - Identify relationships - Create graph nodes and edges - Generate embeddings
Why this matters: Can improve extraction without touching source connectors, or add new sources without changing extraction logic.
2. Protocol Over Inheritance¶
We use protocols (structural subtyping) instead of inheritance:
# Not this (inheritance):
class WebContentSource(BaseContentSource):
pass
# This (protocol):
class WebContentSource:
def get_articles(self) -> Generator[Article, None, None]:
...
Benefits:
- No forced base class
- Duck typing flexibility
- Easier testing (mock any object with get_articles())
3. Composition Over Configuration¶
Sources compose behavior rather than configure it:
# Web source with BFS expansion
web_source = WebContentSource(
url="...",
max_depth=2,
max_links=50
)
# Web source without expansion
web_source_simple = WebContentSource(url="...", max_depth=0)
Processing behavior is composed separately:
# LLM extraction with relationships
processor = ArticleProcessor(conn, use_llm=True, extract_relationships=True)
# Heuristic extraction without relationships
processor_fast = ArticleProcessor(conn, use_llm=False, extract_relationships=False)
Implementation: WebContentSource¶
Let's examine how WebContentSource implements the protocol.
Initialization¶
class WebContentSource:
def __init__(
self,
url: str,
max_depth: int = 0,
max_links: int = 1,
same_domain_only: bool = True,
include_pattern: Optional[str] = None,
exclude_pattern: Optional[str] = None
):
self.url = url
self.max_depth = max_depth
self.max_links = max_links
self.same_domain_only = same_domain_only
self.include_pattern = include_pattern
self.exclude_pattern = exclude_pattern
self.visited = set()
Design choice: Configuration in constructor, not global config files.
Article Generation¶
def get_articles(self) -> Generator[Article, None, None]:
"""
Yields Article objects using BFS.
Algorithm:
1. Start with root URL
2. Fetch and parse HTML
3. Yield Article object
4. Extract links from page
5. Apply filters (domain, patterns)
6. Add to queue for next depth level
7. Repeat until max_depth or max_links reached
"""
queue = [(self.url, 0)] # (url, depth)
while queue and len(self.visited) < self.max_links:
current_url, depth = queue.pop(0)
if current_url in self.visited:
continue
# Fetch and parse
response = requests.get(current_url)
soup = BeautifulSoup(response.text, "html.parser")
# Extract content
title = soup.find("title").text
content = soup.get_text()
links = [a["href"] for a in soup.find_all("a", href=True)]
self.visited.add(current_url)
# Yield Article
yield Article(title=title, content=content, url=current_url, links=links)
# Expand if not at max depth
if depth < self.max_depth:
filtered_links = self.expand_links(current_url, links, depth)
queue.extend((link, depth + 1) for link in filtered_links)
Key aspects:
- Generator pattern: Memory-efficient for large crawls
- BFS traversal: Breadth-first ensures proximity to root
- Lazy evaluation: Articles only fetched when consumed
Link Filtering¶
def expand_links(self, root_url: str, links: List[str], depth: int) -> List[str]:
"""
Apply filters to discovered links.
Filter order (short-circuit):
1. Already visited → skip
2. Same domain check
3. Include pattern match
4. Exclude pattern rejection
5. Max links enforcement
"""
filtered = []
root_domain = urlparse(root_url).netloc
for link in links:
# Already visited
if link in self.visited:
continue
# Same domain filter
if self.same_domain_only:
link_domain = urlparse(link).netloc
if link_domain != root_domain:
continue
# Include pattern
if self.include_pattern and not re.search(self.include_pattern, link):
continue
# Exclude pattern
if self.exclude_pattern and re.search(self.exclude_pattern, link):
continue
# Max links
if len(self.visited) + len(filtered) >= self.max_links:
break
filtered.append(link)
return filtered
Design choice: Early filtering saves HTTP requests and processing time.
Feature Parity Achievement¶
Web sources now have full parity with Wikipedia sources:
| Feature | Wikipedia | Web | Implementation |
|---|---|---|---|
| Entity extraction | ✓ LLM | ✓ LLM | Shared ArticleProcessor |
| Relationship extraction | ✓ LLM | ✓ LLM | Shared ArticleProcessor |
| Link expansion | ✓ Wiki links | ✓ BFS crawl | Source-specific in get_articles() |
| Incremental updates | ✓ update |
✓ update |
URL-based deduplication |
| Vector embeddings | ✓ | ✓ | Shared ArticleProcessor |
| Graph structure | ✓ | ✓ | Shared ArticleProcessor |
Extension Points¶
The architecture makes it easy to add new sources:
Example: GitHub Wiki Source¶
class GitHubWikiSource:
def __init__(self, repo: str, owner: str):
self.repo = repo
self.owner = owner
def get_articles(self) -> Generator[Article, None, None]:
# Use GitHub API to list wiki pages
pages = github_api.get_wiki_pages(self.owner, self.repo)
for page in pages:
content = github_api.get_wiki_content(self.owner, self.repo, page.name)
yield Article(
title=page.title,
content=content,
url=page.url,
links=extract_wiki_links(content)
)
That's it. No changes to ArticleProcessor or CLI needed.
Example: Local Markdown Source¶
class LocalMarkdownSource:
def __init__(self, directory: Path):
self.directory = directory
def get_articles(self) -> Generator[Article, None, None]:
for file_path in self.directory.glob("**/*.md"):
with open(file_path) as f:
content = f.read()
title = file_path.stem
url = f"file://{file_path.absolute()}"
links = extract_markdown_links(content)
yield Article(title=title, content=content, url=url, links=links)
Integration: Add --source=local to CLI, wire up the source.
Trade-offs¶
Benefits¶
Modularity: Sources and processor are independent - Change extraction without touching sources - Add sources without changing extraction
Consistency: Same extraction across all sources - Wikipedia and web produce identical graph structure - Users learn one interface
Testability: Easy to mock and test
- Mock get_articles() for testing processor
- Mock processor for testing sources
Extensibility: Add sources with minimal code
- Implement get_articles() protocol
- Wire into CLI
- Done
Costs¶
Indirection: One extra layer between source and processor - Negligible performance cost - Worth it for modularity
Memory: Article objects created even if not used - Generator pattern mitigates this - Only active article in memory at once
Type safety: Protocol enforcement at runtime, not compile time
- Python limitation, not architecture flaw
- Consider using @runtime_checkable for stricter validation
Comparison to Alternatives¶
Monolithic Approach (Avoided)¶
def create_kg_from_wikipedia(title: str, conn: Connection):
# Fetch from Wikipedia API
# Extract entities inline
# Create graph inline
pass
def create_kg_from_web(url: str, conn: Connection):
# Fetch from web
# Extract entities inline (duplicate code!)
# Create graph inline (duplicate code!)
pass
Problems: - Code duplication between sources - Inconsistent extraction quality - Hard to test components in isolation
Inheritance Hierarchy (Avoided)¶
class BaseContentSource(ABC):
@abstractmethod
def fetch_content(self) -> str:
pass
def process(self, conn: Connection):
content = self.fetch_content()
# Processing logic in base class (tight coupling!)
pass
Problems: - Forces inheritance (rigid) - Processing logic in base class (hard to swap) - Difficult to compose behaviors
Current Approach (Protocol + Composition)¶
# Protocol (flexible interface)
class ContentSource(Protocol):
def get_articles(self) -> Generator[Article, None, None]: ...
# Composition (flexible behavior)
processor = ArticleProcessor(conn, use_llm=True)
for article in source.get_articles():
processor.process_article(article.title, article.content, article.url)
Benefits: - Loose coupling via protocol - Flexible composition - Easy to test and extend
Future Directions¶
Streaming Processing¶
Process articles in parallel:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for article in source.get_articles():
future = executor.submit(processor.process_article, article.title, article.content, article.url)
futures.append(future)
for future in futures:
stats = future.result()
Source Composition¶
Combine multiple sources:
class MultiSource:
def __init__(self, sources: List[ContentSource]):
self.sources = sources
def get_articles(self) -> Generator[Article, None, None]:
for source in self.sources:
yield from source.get_articles()
combined = MultiSource([
WikipediaContentSource(title="Kubernetes"),
WebContentSource(url="https://kubernetes.io/docs/"),
LocalMarkdownSource(directory=Path("./my-notes/"))
])
Incremental Processing¶
Track which articles have been processed:
class IncrementalProcessor:
def __init__(self, processor: ArticleProcessor, cache_path: Path):
self.processor = processor
self.processed = self.load_cache(cache_path)
def process_articles(self, source: ContentSource):
for article in source.get_articles():
if article.url not in self.processed:
self.processor.process_article(...)
self.processed.add(article.url)
self.save_cache()