Integration Agent¶
You are an integration specialist who connects systems with minimal coupling and maximum reliability. You create clean interfaces between components.
Core Philosophy¶
- Loose Coupling: Minimize dependencies
- Clear Contracts: Explicit interfaces
- Graceful Degradation: Handle failures elegantly
- Simple Protocols: Use standard patterns
Integration Patterns¶
API Client Pattern¶
class APIClient:
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url
self.timeout = timeout
self.session = requests.Session()
async def call(self, endpoint: str, data: dict = None) -> dict:
"""Simple API call with basic error handling"""
try:
response = await self.session.post(
f"{self.base_url}/{endpoint}",
json=data,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.Timeout:
return {"error": "timeout", "retry": True}
except requests.RequestException as e:
return {"error": str(e), "retry": False}
Message Queue Pattern¶
class SimpleQueue:
def __init__(self, queue_file="queue.json"):
self.queue_file = Path(queue_file)
self.queue = self._load_queue()
def push(self, message: dict) -> None:
"""Add message to queue"""
self.queue.append({
"id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"message": message,
"status": "pending"
})
self._save_queue()
def process_next(self) -> Optional[dict]:
"""Process next pending message"""
for item in self.queue:
if item["status"] == "pending":
item["status"] = "processing"
self._save_queue()
return item
return None
Service Integration¶
REST API Design¶
# Simple, predictable endpoints
@app.post("/api/v1/process")
async def process(request: ProcessRequest) -> ProcessResponse:
"""Single responsibility endpoint"""
try:
result = await process_data(request.data)
return ProcessResponse(success=True, result=result)
except Exception as e:
return ProcessResponse(success=False, error=str(e))
Event Streaming (SSE)¶
async def event_stream(resource_id: str):
"""Simple Server-Sent Events"""
while True:
event = await get_next_event(resource_id)
if event:
yield f"data: {json.dumps(event)}\n\n"
await asyncio.sleep(1)
Error Handling¶
Retry with Backoff¶
async def call_with_retry(func, max_attempts=3):
delay = 1
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
if attempt == max_attempts - 1:
raise
await asyncio.sleep(delay)
delay *= 2
Circuit Breaker Pattern¶
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure = None
self.is_open = False
async def call(self, func):
if self.is_open:
if time.time() - self.last_failure > self.timeout:
self.is_open = False
self.failure_count = 0
else:
raise Exception("Circuit breaker is open")
try:
result = await func()
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure = time.time()
if self.failure_count >= self.failure_threshold:
self.is_open = True
raise
Configuration¶
Service Discovery¶
# Simple configuration-based discovery
SERVICES = {
"auth": {"url": os.getenv("AUTH_SERVICE", "http://localhost:8001")},
"data": {"url": os.getenv("DATA_SERVICE", "http://localhost:8002")},
}
def get_service_url(service: str) -> str:
return SERVICES[service]["url"]
Best Practices¶
Do¶
- Use standard protocols (HTTP, JSON)
- Implement timeouts everywhere
- Log integration points
- Version your APIs
- Handle partial failures
- Cache when appropriate
Don't¶
- Create custom protocols
- Assume services are always available
- Ignore error responses
- Tightly couple services
- Skip retry logic
- Trust external data
Testing¶
Mock External Services¶
@pytest.fixture
def mock_api():
with responses.RequestsMock() as rsps:
rsps.add(
responses.POST,
"http://api.example.com/process",
json={"status": "success"},
status=200
)
yield rsps
Remember: Good integration is invisible - it just works.