Skip to content

Integration Patterns

Overview

This document describes how different components of the system integrate with each other, including communication patterns, data flow, and external service integration strategies.

Agent-to-Database Communication

Connection Management

Each agent service maintains its own database connection pool to ensure isolation and scalability.

# Agent service database configuration
class DatabaseConfig:
    def __init__(self):
        self.pool = asyncpg.create_pool(
            dsn=os.environ['DATABASE_URL'],
            min_size=5,
            max_size=20,
            command_timeout=60,
            max_inactive_connection_lifetime=300
        )

    async def get_connection(self):
        async with self.pool.acquire() as connection:
            yield connection

Transaction Patterns

Agents use transactions to ensure data consistency when performing multiple operations:

class AgentDatabaseOperations:
    async def create_activity_with_cards(self, project_id: str, activity_data: dict, cards: list):
        async with self.db.get_connection() as conn:
            async with conn.transaction():
                # Create activity
                activity = await conn.fetchrow(
                    "INSERT INTO activities (...) VALUES (...) RETURNING *",
                    activity_data
                )

                # Create multiple cards
                for card in cards:
                    await conn.execute(
                        "INSERT INTO task_cards (...) VALUES (...)",
                        card
                    )

                return activity

State Persistence

LangGraph state persistence is handled through dedicated tables:

class LangGraphPersistence:
    def __init__(self, db_pool):
        self.checkpointer = PostgresSaver(db_pool)

    async def save_agent_state(self, thread_id: str, state: dict):
        # LangGraph handles the complexity of state serialization
        config = {"configurable": {"thread_id": thread_id}}
        await self.checkpointer.aput(config, state)

    async def load_agent_state(self, thread_id: str):
        config = {"configurable": {"thread_id": thread_id}}
        return await self.checkpointer.aget(config)

Job Queue Integration

Job Submission Pattern

Agents submit jobs to the queue service via HTTP API:

class JobQueueClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.session = aiohttp.ClientSession()

    async def create_job(self, card_id: str, params: dict) -> dict:
        async with self.session.post(
            f"{self.base_url}/api/jobs",
            json={
                "card_id": card_id,
                "params": params
            }
        ) as response:
            return await response.json()

Job Status Monitoring

The Job Queue Service publishes status updates to the message broker:

class JobProcessor:
    async def process_job(self, job: dict):
        try:
            # Update status to processing
            await self.update_job_status(job['id'], 'processing')
            await self.publish_status_update(job['id'], 'processing')

            # Call cloud service
            result = await self.cloud_service.generate(job['params'])

            # Update status to completed
            await self.update_job_status(job['id'], 'completed', result['url'])
            await self.publish_status_update(job['id'], 'completed', result['url'])

        except Exception as e:
            # Handle failure
            await self.update_job_status(job['id'], 'failed', error=str(e))
            await self.publish_status_update(job['id'], 'failed')

    async def publish_status_update(self, job_id: str, status: str, result_url: str = None):
        message = {
            'job_id': job_id,
            'status': status,
            'result_url': result_url,
            'timestamp': datetime.utcnow().isoformat()
        }
        await self.message_broker.publish('job.status.updated', message)

Cloud Service Integration

Job Creation and Polling

The system treats cloud generation services as black boxes, creating jobs and polling for results:

class JobQueueService:
    async def create_job(self, card_id: str, params: dict) -> dict:
        # Create job in database
        job_id = str(uuid.uuid4())
        async with self.db.acquire() as conn:
            await conn.execute(
                """INSERT INTO generation_jobs 
                   (id, card_id, status, params, created_at) 
                   VALUES ($1, $2, $3, $4, $5)""",
                job_id, card_id, 'queued', json.dumps(params), datetime.utcnow()
            )

        # Job will be picked up by cloud service
        # Details of cloud integration are outside our scope
        return {
            'job_id': job_id,
            'status': 'queued',
            'message': 'Job created and queued for processing'
        }

    async def check_job_status(self, job_id: str) -> dict:
        async with self.db.acquire() as conn:
            job = await conn.fetchrow(
                "SELECT * FROM generation_jobs WHERE id = $1",
                job_id
            )

            if not job:
                raise ServiceError('JOB_NOT_FOUND', f'Job {job_id} not found')

            return {
                'job_id': job['id'],
                'status': job['status'],
                'result_url': job['result_url'],
                'error_message': job['error_message'],
                'created_at': job['created_at'].isoformat(),
                'completed_at': job['completed_at'].isoformat() if job['completed_at'] else None
            }

Job Status Updates

When cloud services complete generation, they update job status:

class JobStatusUpdater:
    async def update_job_completion(self, job_id: str, result_url: str):
        async with self.db.acquire() as conn:
            # Update job status
            await conn.execute(
                """UPDATE generation_jobs 
                   SET status = 'completed', 
                       result_url = $1, 
                       completed_at = $2
                   WHERE id = $3""",
                result_url, datetime.utcnow(), job_id
            )

            # Get job details for notification
            job = await conn.fetchrow(
                "SELECT * FROM generation_jobs WHERE id = $1",
                job_id
            )

        # Publish completion event
        await self.publish_job_update(job)

    async def update_job_failure(self, job_id: str, error_message: str):
        async with self.db.acquire() as conn:
            await conn.execute(
                """UPDATE generation_jobs 
                   SET status = 'failed', 
                       error_message = $1, 
                       completed_at = $2
                   WHERE id = $3""",
                error_message, datetime.utcnow(), job_id
            )

Message Broker Patterns

Publisher Implementation

Services publish events using a consistent pattern:

class EventPublisher:
    def __init__(self, broker_url: str):
        self.connection = None
        self.channel = None
        self.broker_url = broker_url

    async def connect(self):
        self.connection = await aio_pika.connect_robust(
            self.broker_url,
            reconnect_interval=5
        )
        self.channel = await self.connection.channel()

    async def publish(self, topic: str, message: dict):
        exchange = await self.channel.declare_exchange(
            'events',
            aio_pika.ExchangeType.TOPIC,
            durable=True
        )

        await exchange.publish(
            aio_pika.Message(
                body=json.dumps(message).encode(),
                content_type='application/json',
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT
            ),
            routing_key=topic
        )

Subscriber Implementation

The Notification Service subscribes to relevant events:

class EventSubscriber:
    def __init__(self, broker_url: str):
        self.broker_url = broker_url
        self.handlers = {}

    def register_handler(self, topic: str, handler: Callable):
        self.handlers[topic] = handler

    async def start_consuming(self):
        connection = await aio_pika.connect_robust(self.broker_url)
        channel = await connection.channel()

        exchange = await channel.declare_exchange(
            'events',
            aio_pika.ExchangeType.TOPIC,
            durable=True
        )

        queue = await channel.declare_queue('', exclusive=True)

        for topic in self.handlers.keys():
            await queue.bind(exchange, routing_key=topic)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    topic = message.routing_key
                    if topic in self.handlers:
                        data = json.loads(message.body.decode())
                        await self.handlers[topic](data)

WebSocket Management

Connection Lifecycle

Managing WebSocket connections with proper cleanup:

class WebSocketManager:
    def __init__(self):
        self.connections: Dict[str, Set[WebSocket]] = defaultdict(set)

    async def connect(self, websocket: WebSocket, context_id: str):
        await websocket.accept()
        self.connections[context_id].add(websocket)

    async def disconnect(self, websocket: WebSocket, context_id: str):
        self.connections[context_id].discard(websocket)
        if not self.connections[context_id]:
            del self.connections[context_id]

    async def send_to_context(self, context_id: str, message: dict):
        if context_id in self.connections:
            # Send to all connections for this context
            disconnected = set()
            for websocket in self.connections[context_id]:
                try:
                    await websocket.send_json(message)
                except Exception:
                    disconnected.add(websocket)

            # Clean up disconnected sockets
            for ws in disconnected:
                await self.disconnect(ws, context_id)

Event Routing

Routing events from message broker to WebSocket clients:

class NotificationService:
    def __init__(self, websocket_manager: WebSocketManager):
        self.ws_manager = websocket_manager
        self.subscriber = EventSubscriber(BROKER_URL)

        # Register handlers
        self.subscriber.register_handler(
            'job.status.updated',
            self.handle_job_update
        )

    async def handle_job_update(self, data: dict):
        # Get card context
        card_id = data['card_id']

        # Notify via WebSocket
        await self.ws_manager.send_to_context(
            card_id,
            {
                'type': 'job_update',
                'data': data
            }
        )

        # Callback to agent if job completed
        if data['status'] in ['completed', 'failed']:
            await self.notify_agent(card_id, data)

MCP Server Integration

Tool Registration

Agents register their tools with the MCP server:

class MCPToolRegistry:
    def __init__(self):
        self.tools = {}

    def register_tool(self, name: str, handler: Callable, schema: dict):
        self.tools[name] = {
            'handler': handler,
            'schema': schema
        }

    async def execute_tool(self, name: str, params: dict):
        if name not in self.tools:
            raise ValueError(f"Unknown tool: {name}")

        # Validate parameters against schema
        validate(params, self.tools[name]['schema'])

        # Execute tool
        return await self.tools[name]['handler'](**params)

Context Assembly

MCP server assembles context from multiple sources:

class ContextAssembler:
    def __init__(self, db_pool):
        self.db = db_pool

    async def get_project_memory(self, project_id: str) -> dict:
        async with self.db.acquire() as conn:
            # Fetch project data
            project = await conn.fetchrow(
                "SELECT * FROM projects WHERE id = $1",
                project_id
            )

            # Fetch related data in parallel
            activities, assets, jobs = await asyncio.gather(
                conn.fetch(
                    "SELECT * FROM activities WHERE project_id = $1 ORDER BY position",
                    project_id
                ),
                conn.fetch(
                    "SELECT * FROM approved_assets WHERE project_id = $1",
                    project_id
                ),
                conn.fetch(
                    """SELECT j.* FROM generation_jobs j
                       JOIN task_cards tc ON j.card_id = tc.id
                       JOIN activities a ON tc.activity_id = a.id
                       WHERE a.project_id = $1
                       ORDER BY j.created_at DESC LIMIT 50""",
                    project_id
                )
            )

            return {
                'project': dict(project),
                'activities': [dict(a) for a in activities],
                'approved_assets': [dict(a) for a in assets],
                'recent_jobs': [dict(j) for j in jobs]
            }

Health Check Coordination

Services coordinate health checks across dependencies:

class HealthChecker:
    def __init__(self):
        self.checks = {
            'database': self.check_database,
            'message_broker': self.check_broker,
            'mcp_server': self.check_mcp
        }

    async def check_all(self) -> dict:
        results = {}

        # Run checks in parallel
        check_tasks = {
            name: asyncio.create_task(check())
            for name, check in self.checks.items()
        }

        for name, task in check_tasks.items():
            try:
                await asyncio.wait_for(task, timeout=5.0)
                results[name] = 'healthy'
            except Exception:
                results[name] = 'unhealthy'

        overall_status = 'healthy' if all(
            status == 'healthy' for status in results.values()
        ) else 'unhealthy'

        return {
            'status': overall_status,
            'checks': results,
            'timestamp': datetime.utcnow().isoformat()
        }

Error Propagation

Consistent error handling across service boundaries:

class ServiceError(Exception):
    def __init__(self, code: str, message: str, details: dict = None):
        self.code = code
        self.message = message
        self.details = details or {}
        super().__init__(message)

class ErrorMiddleware:
    async def __call__(self, request, call_next):
        try:
            response = await call_next(request)
            return response
        except ServiceError as e:
            return JSONResponse(
                status_code=400,
                content={
                    'error': {
                        'code': e.code,
                        'message': e.message,
                        'details': e.details
                    }
                }
            )
        except Exception as e:
            logger.exception("Unhandled error")
            return JSONResponse(
                status_code=500,
                content={
                    'error': {
                        'code': 'INTERNAL_ERROR',
                        'message': 'An unexpected error occurred'
                    }
                }
            )

Deployment Integration

Service Discovery

Services register themselves for discovery:

class ServiceRegistry:
    def __init__(self, consul_url: str):
        self.consul = consul.Consul(host=consul_url)

    async def register_service(self, name: str, port: int):
        # Register with health check
        self.consul.agent.service.register(
            name=name,
            service_id=f"{name}-{socket.gethostname()}",
            address=socket.gethostname(),
            port=port,
            check=consul.Check.http(
                f"http://localhost:{port}/health",
                interval="10s",
                timeout="5s",
                deregister="30s"
            )
        )

    async def discover_service(self, name: str) -> str:
        _, services = self.consul.health.service(name, passing=True)
        if not services:
            raise ServiceError("SERVICE_UNAVAILABLE", f"No healthy {name} instances")

        # Simple random selection
        service = random.choice(services)
        return f"http://{service['Service']['Address']}:{service['Service']['Port']}"

Configuration Management

Centralized configuration with environment-specific overrides:

class ConfigManager:
    def __init__(self):
        self.config = self._load_config()

    def _load_config(self) -> dict:
        # Base configuration
        config = {
            'database_url': os.environ.get('DATABASE_URL'),
            'broker_url': os.environ.get('BROKER_URL', 'amqp://localhost'),
            'mcp_server_url': os.environ.get('MCP_SERVER_URL'),
            'log_level': os.environ.get('LOG_LEVEL', 'INFO')
        }

        # Environment-specific overrides
        env = os.environ.get('ENVIRONMENT', 'development')
        if env == 'production':
            config.update({
                'log_level': 'WARNING',
                'debug': False
            })

        return config