Skip to content

4. Adaptadores de Mensageria

Visão geral

O sistema de mensageria permite que o bot converse em múltiplas plataformas usando uma interface unificada. Ele usa três design patterns:

flowchart TD
    subgraph "Design Patterns"
        direction LR
        FP["🏭 Factory Pattern<br/><i>AdapterFactory cria adapters</i>"]
        AP["🔌 Adapter Pattern<br/><i>Interface uniforme para todas as plataformas</i>"]
        SP["📐 Strategy Pattern<br/><i>Cada plataforma é uma estratégia</i>"]
    end

Estrutura de arquivos

messaging/
├── base.py               ← Interface abstrata (MessagingAdapter)
├── factory.py            ← AdapterFactory (registro e resolução)
├── processor.py          ← ConversationProcessor (orquestra tudo)
├── adapters/
│   ├── slack_adapter.py           ← Slack (Bolt SDK)
│   ├── whatsapp_evolution.py      ← WhatsApp via Evolution API
│   ├── whatsapp_official.py       ← WhatsApp Business API (Meta)
│   ├── telegram_adapter.py        ← Telegram Bot API
│   ├── webchat_adapter.py         ← WebChat (HTTP REST)
│   └── sse_adapter.py             ← Server-Sent Events
└── sse/
    └── stream_manager.py          ← Gerencia streams SSE

A interface base: MessagingAdapter

Todos os adapters implementam esta interface:

classDiagram
    class MessagingAdapter {
        <<abstract>>
        +platform_name: str
        +webhook_path: str
        +parse_webhook_request(request) IncomingMessage
        +send_message(message: OutgoingMessage) bool
        +send_typing_indicator(channel_id)
        +update_message(channel_id, message_id, text)
        +delete_message(channel_id, message_id)
        +format_text(markdown_text) str
    }

    class IncomingMessage {
        +user_id: str
        +channel_id: str
        +thread_id: str
        +text: str
        +platform: str
        +metadata: dict
        +jwt_token: str
    }

    class OutgoingMessage {
        +channel_id: str
        +thread_id: str
        +text: str
        +metadata: dict
    }

    MessagingAdapter ..> IncomingMessage : produz
    MessagingAdapter ..> OutgoingMessage : consome

    class SlackAdapter { }
    class WhatsAppEvolutionAdapter { }
    class WhatsAppOfficialAdapter { }
    class TelegramAdapter { }
    class WebChatAdapter { }
    class SSEAdapter { }

    MessagingAdapter <|-- SlackAdapter
    MessagingAdapter <|-- WhatsAppEvolutionAdapter
    MessagingAdapter <|-- WhatsAppOfficialAdapter
    MessagingAdapter <|-- TelegramAdapter
    MessagingAdapter <|-- WebChatAdapter
    MessagingAdapter <|-- SSEAdapter

Métodos obrigatórios

Método Responsabilidade
parse_webhook_request() Recebe o request HTTP cru da plataforma e extrai: quem mandou, o que disse, token JWT, metadata
send_message() Envia a resposta do agente de volta para a plataforma
send_typing_indicator() Mostra indicador "digitando..." na plataforma
format_text() Converte Markdown genérico para formato da plataforma (ex: Slack usa *bold* em vez de **bold**)

Properties obrigatórias

Property Exemplo
platform_name "slack", "whatsapp", "webchat"
webhook_path "/slack/events", "/whatsapp/webhook"

Adapters disponíveis

flowchart LR
    subgraph "Produção ✅"
        SLACK["Slack<br/>/slack/events<br/><i>Bot Token + Signing Secret</i>"]
        WAEVO["WhatsApp Evolution<br/>/whatsapp/webhook<br/><i>Evolution API (self-hosted)</i>"]
        WAOFF["WhatsApp Official<br/>/whatsapp-official/webhook<br/><i>Meta Business API</i>"]
        WC["WebChat<br/>/webchat/message<br/><i>HTTP REST</i>"]
        SSE["SSE<br/>/sse/chat + /sse/stream/{id}<br/><i>Server-Sent Events</i>"]
    end

    subgraph "Exemplo ⚠️"
        TG["Telegram<br/>/telegram/webhook<br/><i>Bot API</i>"]
    end

Detalhes por adapter

Slack (SlackAdapter)

  • Usa Slack Bolt SDK
  • Responde em threads
  • Formata Markdown para Slack (mrkdwn)
  • Suporta atualização de mensagens (feedback visual)
  • Env vars: SLACK_BOT_TOKEN, SLACK_SIGNING_SECRET

WhatsApp Evolution (WhatsAppEvolutionAdapter)

  • Integra com Evolution API (instância self-hosted)
  • Envia/recebe mensagens de texto
  • Suporta JWT Lookup via Redis (comando /ifriend-auth)
  • Env vars: WHATSAPP_EVOLUTION_API_URL, WHATSAPP_EVOLUTION_API_KEY, WHATSAPP_EVOLUTION_INSTANCE

WhatsApp Official (WhatsAppOfficialAdapter)

  • Integra com Meta WhatsApp Business API
  • Verifica webhook token no GET
  • Envia/recebe via Graph API
  • Env vars: WHATSAPP_OFFICIAL_TOKEN, WHATSAPP_OFFICIAL_PHONE_NUMBER_ID, WHATSAPP_OFFICIAL_VERIFY_TOKEN

WebChat (WebChatAdapter)

  • REST puro — client envia POST, recebe resposta no body
  • Ideal para integração em websites
  • JWT via header Authorization: Bearer
  • Env vars: WEBCHAT_ENABLED, WEBCHAT_ALLOWED_ORIGINS

SSE (SSEAdapter)

  • Processa em background, emite eventos em tempo real
  • Client faz POST em /sse/chat, depois abre stream em /sse/stream/{request_id}
  • Eventos: stream_start, tool_call_start, text_chunk, stream_complete
  • Env vars: SSE_ENABLED

Telegram (TelegramAdapter)

  • Implementação de exemplo/referência
  • Usa Telegram Bot API diretamente
  • Env vars: TELEGRAM_BOT_TOKEN

AdapterFactory

A factory gerencia o registro e resolução de adapters:

flowchart TD
    ENV["Env Vars:<br/>MESSAGING_PLATFORMS=slack,whatsapp<br/>SLACK_BOT_TOKEN=xoxb-...<br/>WHATSAPP_EVOLUTION_API_KEY=..."]

    ENV --> REG["AdapterFactory.register_from_env()"]

    REG --> DICT["Registry interno:<br/>/slack/events → SlackAdapter<br/>/whatsapp/webhook → WhatsAppEvolutionAdapter"]

    REQ["Request HTTP<br/>POST /slack/events"] --> RES["AdapterFactory.resolve(path)"]

    DICT --> RES
    RES --> ADAPTER["SlackAdapter instance"]

Como funciona: 1. No startup, register_from_env() lê as env vars e instancia os adapters configurados 2. Cada adapter registra seu webhook_path na factory 3. Quando chega um request, a factory resolve o adapter pelo path da URL 4. Se MESSAGING_PLATFORMS não está definido, detecta automaticamente pelos tokens presentes

ConversationProcessor

O ConversationProcessor é o orquestrador central. Ele conecta adapter → session → ADK:

flowchart TD
    IM[IncomingMessage] --> CP[ConversationProcessor]

    CP --> STEP1["1️⃣ Decodifica JWT<br/>(se presente no token)"]
    STEP1 --> STEP2["2️⃣ Obtém/cria Session<br/>(Redis ou CloudSQL)"]
    STEP2 --> STEP3["3️⃣ Injeta jwt_context<br/>no session.state"]
    STEP3 --> STEP4["4️⃣ runner.run_async()<br/>(ADK processa)"]

    STEP4 --> FEEDBACK["↩️ Feedback visual<br/>(atualiza msg com nome da tool)"]
    FEEDBACK --> STEP4

    STEP4 --> STEP5["5️⃣ Extrai texto da resposta"]
    STEP5 --> STEP6["6️⃣ adapter.send_message()"]
    STEP6 --> USER[Resposta para usuário]

Feedback visual: durante o processamento, o Processor atualiza a mensagem de status na plataforma com o nome da tool sendo executada (ex: "🔍 Buscando produtos...").

SSE: como funciona o streaming

O SSE é especial porque usa dois endpoints e processamento assíncrono:

sequenceDiagram
    actor Client as Frontend
    participant API as FastAPI
    participant SSE as SSEAdapter
    participant SM as StreamManager
    participant ADK as Runner ADK

    Client->>API: POST /sse/chat {message, jwt}
    API->>SSE: parse_webhook_request()
    SSE->>SM: create_stream(request_id)
    SSE-->>Client: {request_id: "abc123"}

    Note over SSE,ADK: Processamento em background (asyncio.create_task)
    SSE->>ADK: runner.run_async()

    Client->>API: GET /sse/stream/abc123
    API->>SM: subscribe(request_id)

    loop Eventos em tempo real
        ADK-->>SM: emit("tool_call_start", {tool: "busca_produtos"})
        SM-->>Client: event: tool_call_start\ndata: {...}

        ADK-->>SM: emit("text_chunk", {text: "Encontrei 3..."})
        SM-->>Client: event: text_chunk\ndata: {...}
    end

    ADK-->>SM: emit("stream_complete", {text: "resposta final"})
    SM-->>Client: event: stream_complete\ndata: {...}

Como criar um novo adapter

1. Crie o arquivo do adapter

# messaging/adapters/meu_adapter.py

from messaging.base import MessagingAdapter, IncomingMessage, OutgoingMessage

class MeuAdapter(MessagingAdapter):

    @property
    def platform_name(self) -> str:
        return "minha_plataforma"

    @property
    def webhook_path(self) -> str:
        return "/minha-plataforma/webhook"

    async def parse_webhook_request(self, request) -> IncomingMessage:
        body = await request.json()
        return IncomingMessage(
            user_id=body["sender_id"],
            channel_id=body["chat_id"],
            text=body["message"],
            platform=self.platform_name,
        )

    async def send_message(self, message: OutgoingMessage) -> bool:
        # Envia resposta via API da plataforma
        async with aiohttp.ClientSession() as session:
            await session.post(
                "https://api.minhaplataforma.com/send",
                json={"chat_id": message.channel_id, "text": message.text}
            )
        return True

    def format_text(self, text: str) -> str:
        # Converte markdown se necessário
        return text

2. Registre na factory

Em messaging/factory.py, adicione a detecção:

# Na função register_from_env()
if os.getenv("MINHA_PLATAFORMA_TOKEN"):
    adapter = MeuAdapter(token=os.getenv("MINHA_PLATAFORMA_TOKEN"))
    self.register(adapter)

3. Adicione a route no FastAPI

Em unified_bot.py:

@app.post("/minha-plataforma/webhook")
async def minha_plataforma_webhook(request: Request):
    adapter = adapter_factory.resolve("/minha-plataforma/webhook")
    message = await adapter.parse_webhook_request(request)
    response = await processor.process_message(message, adapter)
    return response

4. Adicione testes

Em tests/test_meu_adapter.py.

Checklist para manutenção de adapters

  • [ ] Ao alterar parse_webhook_request(), valide que todos os campos do IncomingMessage estão preenchidos
  • [ ] Ao alterar send_message(), teste com mensagens longas (truncamento)
  • [ ] format_text() deve lidar com markdown, emojis e caracteres especiais
  • [ ] Verifique se send_typing_indicator() está implementado (UX)
  • [ ] Confira as env vars no cloudbuild.yaml ao adicionar novo adapter

Anterior: ← O Agente iFriend · Próximo: Sessão, Memória e Auth →