Integration Patterns¶
Overview¶
This document describes how different components of the system integrate with each other, including communication patterns, data flow, and external service integration strategies.
Agent-to-Database Communication¶
Connection Management¶
Each agent service maintains its own database connection pool to ensure isolation and scalability.
# Agent service database configuration
class DatabaseConfig:
def __init__(self):
self.pool = asyncpg.create_pool(
dsn=os.environ['DATABASE_URL'],
min_size=5,
max_size=20,
command_timeout=60,
max_inactive_connection_lifetime=300
)
async def get_connection(self):
async with self.pool.acquire() as connection:
yield connection
Transaction Patterns¶
Agents use transactions to ensure data consistency when performing multiple operations:
class AgentDatabaseOperations:
async def create_activity_with_cards(self, project_id: str, activity_data: dict, cards: list):
async with self.db.get_connection() as conn:
async with conn.transaction():
# Create activity
activity = await conn.fetchrow(
"INSERT INTO activities (...) VALUES (...) RETURNING *",
activity_data
)
# Create multiple cards
for card in cards:
await conn.execute(
"INSERT INTO task_cards (...) VALUES (...)",
card
)
return activity
State Persistence¶
LangGraph state persistence is handled through dedicated tables:
class LangGraphPersistence:
def __init__(self, db_pool):
self.checkpointer = PostgresSaver(db_pool)
async def save_agent_state(self, thread_id: str, state: dict):
# LangGraph handles the complexity of state serialization
config = {"configurable": {"thread_id": thread_id}}
await self.checkpointer.aput(config, state)
async def load_agent_state(self, thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
return await self.checkpointer.aget(config)
Job Queue Integration¶
Job Submission Pattern¶
Agents submit jobs to the queue service via HTTP API:
class JobQueueClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.session = aiohttp.ClientSession()
async def create_job(self, card_id: str, params: dict) -> dict:
async with self.session.post(
f"{self.base_url}/api/jobs",
json={
"card_id": card_id,
"params": params
}
) as response:
return await response.json()
Job Status Monitoring¶
The Job Queue Service publishes status updates to the message broker:
class JobProcessor:
async def process_job(self, job: dict):
try:
# Update status to processing
await self.update_job_status(job['id'], 'processing')
await self.publish_status_update(job['id'], 'processing')
# Call cloud service
result = await self.cloud_service.generate(job['params'])
# Update status to completed
await self.update_job_status(job['id'], 'completed', result['url'])
await self.publish_status_update(job['id'], 'completed', result['url'])
except Exception as e:
# Handle failure
await self.update_job_status(job['id'], 'failed', error=str(e))
await self.publish_status_update(job['id'], 'failed')
async def publish_status_update(self, job_id: str, status: str, result_url: str = None):
message = {
'job_id': job_id,
'status': status,
'result_url': result_url,
'timestamp': datetime.utcnow().isoformat()
}
await self.message_broker.publish('job.status.updated', message)
Cloud Service Integration¶
Job Creation and Polling¶
The system treats cloud generation services as black boxes, creating jobs and polling for results:
class JobQueueService:
async def create_job(self, card_id: str, params: dict) -> dict:
# Create job in database
job_id = str(uuid.uuid4())
async with self.db.acquire() as conn:
await conn.execute(
"""INSERT INTO generation_jobs
(id, card_id, status, params, created_at)
VALUES ($1, $2, $3, $4, $5)""",
job_id, card_id, 'queued', json.dumps(params), datetime.utcnow()
)
# Job will be picked up by cloud service
# Details of cloud integration are outside our scope
return {
'job_id': job_id,
'status': 'queued',
'message': 'Job created and queued for processing'
}
async def check_job_status(self, job_id: str) -> dict:
async with self.db.acquire() as conn:
job = await conn.fetchrow(
"SELECT * FROM generation_jobs WHERE id = $1",
job_id
)
if not job:
raise ServiceError('JOB_NOT_FOUND', f'Job {job_id} not found')
return {
'job_id': job['id'],
'status': job['status'],
'result_url': job['result_url'],
'error_message': job['error_message'],
'created_at': job['created_at'].isoformat(),
'completed_at': job['completed_at'].isoformat() if job['completed_at'] else None
}
Job Status Updates¶
When cloud services complete generation, they update job status:
class JobStatusUpdater:
async def update_job_completion(self, job_id: str, result_url: str):
async with self.db.acquire() as conn:
# Update job status
await conn.execute(
"""UPDATE generation_jobs
SET status = 'completed',
result_url = $1,
completed_at = $2
WHERE id = $3""",
result_url, datetime.utcnow(), job_id
)
# Get job details for notification
job = await conn.fetchrow(
"SELECT * FROM generation_jobs WHERE id = $1",
job_id
)
# Publish completion event
await self.publish_job_update(job)
async def update_job_failure(self, job_id: str, error_message: str):
async with self.db.acquire() as conn:
await conn.execute(
"""UPDATE generation_jobs
SET status = 'failed',
error_message = $1,
completed_at = $2
WHERE id = $3""",
error_message, datetime.utcnow(), job_id
)
Message Broker Patterns¶
Publisher Implementation¶
Services publish events using a consistent pattern:
class EventPublisher:
def __init__(self, broker_url: str):
self.connection = None
self.channel = None
self.broker_url = broker_url
async def connect(self):
self.connection = await aio_pika.connect_robust(
self.broker_url,
reconnect_interval=5
)
self.channel = await self.connection.channel()
async def publish(self, topic: str, message: dict):
exchange = await self.channel.declare_exchange(
'events',
aio_pika.ExchangeType.TOPIC,
durable=True
)
await exchange.publish(
aio_pika.Message(
body=json.dumps(message).encode(),
content_type='application/json',
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
),
routing_key=topic
)
Subscriber Implementation¶
The Notification Service subscribes to relevant events:
class EventSubscriber:
def __init__(self, broker_url: str):
self.broker_url = broker_url
self.handlers = {}
def register_handler(self, topic: str, handler: Callable):
self.handlers[topic] = handler
async def start_consuming(self):
connection = await aio_pika.connect_robust(self.broker_url)
channel = await connection.channel()
exchange = await channel.declare_exchange(
'events',
aio_pika.ExchangeType.TOPIC,
durable=True
)
queue = await channel.declare_queue('', exclusive=True)
for topic in self.handlers.keys():
await queue.bind(exchange, routing_key=topic)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
topic = message.routing_key
if topic in self.handlers:
data = json.loads(message.body.decode())
await self.handlers[topic](data)
WebSocket Management¶
Connection Lifecycle¶
Managing WebSocket connections with proper cleanup:
class WebSocketManager:
def __init__(self):
self.connections: Dict[str, Set[WebSocket]] = defaultdict(set)
async def connect(self, websocket: WebSocket, context_id: str):
await websocket.accept()
self.connections[context_id].add(websocket)
async def disconnect(self, websocket: WebSocket, context_id: str):
self.connections[context_id].discard(websocket)
if not self.connections[context_id]:
del self.connections[context_id]
async def send_to_context(self, context_id: str, message: dict):
if context_id in self.connections:
# Send to all connections for this context
disconnected = set()
for websocket in self.connections[context_id]:
try:
await websocket.send_json(message)
except Exception:
disconnected.add(websocket)
# Clean up disconnected sockets
for ws in disconnected:
await self.disconnect(ws, context_id)
Event Routing¶
Routing events from message broker to WebSocket clients:
class NotificationService:
def __init__(self, websocket_manager: WebSocketManager):
self.ws_manager = websocket_manager
self.subscriber = EventSubscriber(BROKER_URL)
# Register handlers
self.subscriber.register_handler(
'job.status.updated',
self.handle_job_update
)
async def handle_job_update(self, data: dict):
# Get card context
card_id = data['card_id']
# Notify via WebSocket
await self.ws_manager.send_to_context(
card_id,
{
'type': 'job_update',
'data': data
}
)
# Callback to agent if job completed
if data['status'] in ['completed', 'failed']:
await self.notify_agent(card_id, data)
MCP Server Integration¶
Tool Registration¶
Agents register their tools with the MCP server:
class MCPToolRegistry:
def __init__(self):
self.tools = {}
def register_tool(self, name: str, handler: Callable, schema: dict):
self.tools[name] = {
'handler': handler,
'schema': schema
}
async def execute_tool(self, name: str, params: dict):
if name not in self.tools:
raise ValueError(f"Unknown tool: {name}")
# Validate parameters against schema
validate(params, self.tools[name]['schema'])
# Execute tool
return await self.tools[name]['handler'](**params)
Context Assembly¶
MCP server assembles context from multiple sources:
class ContextAssembler:
def __init__(self, db_pool):
self.db = db_pool
async def get_project_memory(self, project_id: str) -> dict:
async with self.db.acquire() as conn:
# Fetch project data
project = await conn.fetchrow(
"SELECT * FROM projects WHERE id = $1",
project_id
)
# Fetch related data in parallel
activities, assets, jobs = await asyncio.gather(
conn.fetch(
"SELECT * FROM activities WHERE project_id = $1 ORDER BY position",
project_id
),
conn.fetch(
"SELECT * FROM approved_assets WHERE project_id = $1",
project_id
),
conn.fetch(
"""SELECT j.* FROM generation_jobs j
JOIN task_cards tc ON j.card_id = tc.id
JOIN activities a ON tc.activity_id = a.id
WHERE a.project_id = $1
ORDER BY j.created_at DESC LIMIT 50""",
project_id
)
)
return {
'project': dict(project),
'activities': [dict(a) for a in activities],
'approved_assets': [dict(a) for a in assets],
'recent_jobs': [dict(j) for j in jobs]
}
Health Check Coordination¶
Services coordinate health checks across dependencies:
class HealthChecker:
def __init__(self):
self.checks = {
'database': self.check_database,
'message_broker': self.check_broker,
'mcp_server': self.check_mcp
}
async def check_all(self) -> dict:
results = {}
# Run checks in parallel
check_tasks = {
name: asyncio.create_task(check())
for name, check in self.checks.items()
}
for name, task in check_tasks.items():
try:
await asyncio.wait_for(task, timeout=5.0)
results[name] = 'healthy'
except Exception:
results[name] = 'unhealthy'
overall_status = 'healthy' if all(
status == 'healthy' for status in results.values()
) else 'unhealthy'
return {
'status': overall_status,
'checks': results,
'timestamp': datetime.utcnow().isoformat()
}
Error Propagation¶
Consistent error handling across service boundaries:
class ServiceError(Exception):
def __init__(self, code: str, message: str, details: dict = None):
self.code = code
self.message = message
self.details = details or {}
super().__init__(message)
class ErrorMiddleware:
async def __call__(self, request, call_next):
try:
response = await call_next(request)
return response
except ServiceError as e:
return JSONResponse(
status_code=400,
content={
'error': {
'code': e.code,
'message': e.message,
'details': e.details
}
}
)
except Exception as e:
logger.exception("Unhandled error")
return JSONResponse(
status_code=500,
content={
'error': {
'code': 'INTERNAL_ERROR',
'message': 'An unexpected error occurred'
}
}
)
Deployment Integration¶
Service Discovery¶
Services register themselves for discovery:
class ServiceRegistry:
def __init__(self, consul_url: str):
self.consul = consul.Consul(host=consul_url)
async def register_service(self, name: str, port: int):
# Register with health check
self.consul.agent.service.register(
name=name,
service_id=f"{name}-{socket.gethostname()}",
address=socket.gethostname(),
port=port,
check=consul.Check.http(
f"http://localhost:{port}/health",
interval="10s",
timeout="5s",
deregister="30s"
)
)
async def discover_service(self, name: str) -> str:
_, services = self.consul.health.service(name, passing=True)
if not services:
raise ServiceError("SERVICE_UNAVAILABLE", f"No healthy {name} instances")
# Simple random selection
service = random.choice(services)
return f"http://{service['Service']['Address']}:{service['Service']['Port']}"
Configuration Management¶
Centralized configuration with environment-specific overrides:
class ConfigManager:
def __init__(self):
self.config = self._load_config()
def _load_config(self) -> dict:
# Base configuration
config = {
'database_url': os.environ.get('DATABASE_URL'),
'broker_url': os.environ.get('BROKER_URL', 'amqp://localhost'),
'mcp_server_url': os.environ.get('MCP_SERVER_URL'),
'log_level': os.environ.get('LOG_LEVEL', 'INFO')
}
# Environment-specific overrides
env = os.environ.get('ENVIRONMENT', 'development')
if env == 'production':
config.update({
'log_level': 'WARNING',
'debug': False
})
return config