4. Adaptadores de Mensageria¶
Visão geral¶
O sistema de mensageria permite que o bot converse em múltiplas plataformas usando uma interface unificada. Ele usa três design patterns:
flowchart TD
subgraph "Design Patterns"
direction LR
FP["🏭 Factory Pattern<br/><i>AdapterFactory cria adapters</i>"]
AP["🔌 Adapter Pattern<br/><i>Interface uniforme para todas as plataformas</i>"]
SP["📐 Strategy Pattern<br/><i>Cada plataforma é uma estratégia</i>"]
end
Estrutura de arquivos¶
messaging/
├── base.py ← Interface abstrata (MessagingAdapter)
├── factory.py ← AdapterFactory (registro e resolução)
├── processor.py ← ConversationProcessor (orquestra tudo)
├── adapters/
│ ├── slack_adapter.py ← Slack (Bolt SDK)
│ ├── whatsapp_evolution.py ← WhatsApp via Evolution API
│ ├── whatsapp_official.py ← WhatsApp Business API (Meta)
│ ├── telegram_adapter.py ← Telegram Bot API
│ ├── webchat_adapter.py ← WebChat (HTTP REST)
│ └── sse_adapter.py ← Server-Sent Events
└── sse/
└── stream_manager.py ← Gerencia streams SSE
A interface base: MessagingAdapter¶
Todos os adapters implementam esta interface:
classDiagram
class MessagingAdapter {
<<abstract>>
+platform_name: str
+webhook_path: str
+parse_webhook_request(request) IncomingMessage
+send_message(message: OutgoingMessage) bool
+send_typing_indicator(channel_id)
+update_message(channel_id, message_id, text)
+delete_message(channel_id, message_id)
+format_text(markdown_text) str
}
class IncomingMessage {
+user_id: str
+channel_id: str
+thread_id: str
+text: str
+platform: str
+metadata: dict
+jwt_token: str
}
class OutgoingMessage {
+channel_id: str
+thread_id: str
+text: str
+metadata: dict
}
MessagingAdapter ..> IncomingMessage : produz
MessagingAdapter ..> OutgoingMessage : consome
class SlackAdapter { }
class WhatsAppEvolutionAdapter { }
class WhatsAppOfficialAdapter { }
class TelegramAdapter { }
class WebChatAdapter { }
class SSEAdapter { }
MessagingAdapter <|-- SlackAdapter
MessagingAdapter <|-- WhatsAppEvolutionAdapter
MessagingAdapter <|-- WhatsAppOfficialAdapter
MessagingAdapter <|-- TelegramAdapter
MessagingAdapter <|-- WebChatAdapter
MessagingAdapter <|-- SSEAdapter
Métodos obrigatórios¶
| Método | Responsabilidade |
|---|---|
parse_webhook_request() |
Recebe o request HTTP cru da plataforma e extrai: quem mandou, o que disse, token JWT, metadata |
send_message() |
Envia a resposta do agente de volta para a plataforma |
send_typing_indicator() |
Mostra indicador "digitando..." na plataforma |
format_text() |
Converte Markdown genérico para formato da plataforma (ex: Slack usa *bold* em vez de **bold**) |
Properties obrigatórias¶
| Property | Exemplo |
|---|---|
platform_name |
"slack", "whatsapp", "webchat" |
webhook_path |
"/slack/events", "/whatsapp/webhook" |
Adapters disponíveis¶
flowchart LR
subgraph "Produção ✅"
SLACK["Slack<br/>/slack/events<br/><i>Bot Token + Signing Secret</i>"]
WAEVO["WhatsApp Evolution<br/>/whatsapp/webhook<br/><i>Evolution API (self-hosted)</i>"]
WAOFF["WhatsApp Official<br/>/whatsapp-official/webhook<br/><i>Meta Business API</i>"]
WC["WebChat<br/>/webchat/message<br/><i>HTTP REST</i>"]
SSE["SSE<br/>/sse/chat + /sse/stream/{id}<br/><i>Server-Sent Events</i>"]
end
subgraph "Exemplo ⚠️"
TG["Telegram<br/>/telegram/webhook<br/><i>Bot API</i>"]
end
Detalhes por adapter¶
Slack (SlackAdapter)¶
- Usa Slack Bolt SDK
- Responde em threads
- Formata Markdown para Slack (mrkdwn)
- Suporta atualização de mensagens (feedback visual)
- Env vars:
SLACK_BOT_TOKEN,SLACK_SIGNING_SECRET
WhatsApp Evolution (WhatsAppEvolutionAdapter)¶
- Integra com Evolution API (instância self-hosted)
- Envia/recebe mensagens de texto
- Suporta JWT Lookup via Redis (comando
/ifriend-auth) - Env vars:
WHATSAPP_EVOLUTION_API_URL,WHATSAPP_EVOLUTION_API_KEY,WHATSAPP_EVOLUTION_INSTANCE
WhatsApp Official (WhatsAppOfficialAdapter)¶
- Integra com Meta WhatsApp Business API
- Verifica webhook token no GET
- Envia/recebe via Graph API
- Env vars:
WHATSAPP_OFFICIAL_TOKEN,WHATSAPP_OFFICIAL_PHONE_NUMBER_ID,WHATSAPP_OFFICIAL_VERIFY_TOKEN
WebChat (WebChatAdapter)¶
- REST puro — client envia POST, recebe resposta no body
- Ideal para integração em websites
- JWT via header
Authorization: Bearer - Env vars:
WEBCHAT_ENABLED,WEBCHAT_ALLOWED_ORIGINS
SSE (SSEAdapter)¶
- Processa em background, emite eventos em tempo real
- Client faz POST em
/sse/chat, depois abre stream em/sse/stream/{request_id} - Eventos:
stream_start,tool_call_start,text_chunk,stream_complete - Env vars:
SSE_ENABLED
Telegram (TelegramAdapter)¶
- Implementação de exemplo/referência
- Usa Telegram Bot API diretamente
- Env vars:
TELEGRAM_BOT_TOKEN
AdapterFactory¶
A factory gerencia o registro e resolução de adapters:
flowchart TD
ENV["Env Vars:<br/>MESSAGING_PLATFORMS=slack,whatsapp<br/>SLACK_BOT_TOKEN=xoxb-...<br/>WHATSAPP_EVOLUTION_API_KEY=..."]
ENV --> REG["AdapterFactory.register_from_env()"]
REG --> DICT["Registry interno:<br/>/slack/events → SlackAdapter<br/>/whatsapp/webhook → WhatsAppEvolutionAdapter"]
REQ["Request HTTP<br/>POST /slack/events"] --> RES["AdapterFactory.resolve(path)"]
DICT --> RES
RES --> ADAPTER["SlackAdapter instance"]
Como funciona:
1. No startup, register_from_env() lê as env vars e instancia os adapters configurados
2. Cada adapter registra seu webhook_path na factory
3. Quando chega um request, a factory resolve o adapter pelo path da URL
4. Se MESSAGING_PLATFORMS não está definido, detecta automaticamente pelos tokens presentes
ConversationProcessor¶
O ConversationProcessor é o orquestrador central. Ele conecta adapter → session → ADK:
flowchart TD
IM[IncomingMessage] --> CP[ConversationProcessor]
CP --> STEP1["1️⃣ Decodifica JWT<br/>(se presente no token)"]
STEP1 --> STEP2["2️⃣ Obtém/cria Session<br/>(Redis ou CloudSQL)"]
STEP2 --> STEP3["3️⃣ Injeta jwt_context<br/>no session.state"]
STEP3 --> STEP4["4️⃣ runner.run_async()<br/>(ADK processa)"]
STEP4 --> FEEDBACK["↩️ Feedback visual<br/>(atualiza msg com nome da tool)"]
FEEDBACK --> STEP4
STEP4 --> STEP5["5️⃣ Extrai texto da resposta"]
STEP5 --> STEP6["6️⃣ adapter.send_message()"]
STEP6 --> USER[Resposta para usuário]
Feedback visual: durante o processamento, o Processor atualiza a mensagem de status na plataforma com o nome da tool sendo executada (ex: "🔍 Buscando produtos...").
SSE: como funciona o streaming¶
O SSE é especial porque usa dois endpoints e processamento assíncrono:
sequenceDiagram
actor Client as Frontend
participant API as FastAPI
participant SSE as SSEAdapter
participant SM as StreamManager
participant ADK as Runner ADK
Client->>API: POST /sse/chat {message, jwt}
API->>SSE: parse_webhook_request()
SSE->>SM: create_stream(request_id)
SSE-->>Client: {request_id: "abc123"}
Note over SSE,ADK: Processamento em background (asyncio.create_task)
SSE->>ADK: runner.run_async()
Client->>API: GET /sse/stream/abc123
API->>SM: subscribe(request_id)
loop Eventos em tempo real
ADK-->>SM: emit("tool_call_start", {tool: "busca_produtos"})
SM-->>Client: event: tool_call_start\ndata: {...}
ADK-->>SM: emit("text_chunk", {text: "Encontrei 3..."})
SM-->>Client: event: text_chunk\ndata: {...}
end
ADK-->>SM: emit("stream_complete", {text: "resposta final"})
SM-->>Client: event: stream_complete\ndata: {...}
Como criar um novo adapter¶
1. Crie o arquivo do adapter¶
# messaging/adapters/meu_adapter.py
from messaging.base import MessagingAdapter, IncomingMessage, OutgoingMessage
class MeuAdapter(MessagingAdapter):
@property
def platform_name(self) -> str:
return "minha_plataforma"
@property
def webhook_path(self) -> str:
return "/minha-plataforma/webhook"
async def parse_webhook_request(self, request) -> IncomingMessage:
body = await request.json()
return IncomingMessage(
user_id=body["sender_id"],
channel_id=body["chat_id"],
text=body["message"],
platform=self.platform_name,
)
async def send_message(self, message: OutgoingMessage) -> bool:
# Envia resposta via API da plataforma
async with aiohttp.ClientSession() as session:
await session.post(
"https://api.minhaplataforma.com/send",
json={"chat_id": message.channel_id, "text": message.text}
)
return True
def format_text(self, text: str) -> str:
# Converte markdown se necessário
return text
2. Registre na factory¶
Em messaging/factory.py, adicione a detecção:
# Na função register_from_env()
if os.getenv("MINHA_PLATAFORMA_TOKEN"):
adapter = MeuAdapter(token=os.getenv("MINHA_PLATAFORMA_TOKEN"))
self.register(adapter)
3. Adicione a route no FastAPI¶
Em unified_bot.py:
@app.post("/minha-plataforma/webhook")
async def minha_plataforma_webhook(request: Request):
adapter = adapter_factory.resolve("/minha-plataforma/webhook")
message = await adapter.parse_webhook_request(request)
response = await processor.process_message(message, adapter)
return response
4. Adicione testes¶
Em tests/test_meu_adapter.py.
Checklist para manutenção de adapters¶
- [ ] Ao alterar
parse_webhook_request(), valide que todos os campos doIncomingMessageestão preenchidos - [ ] Ao alterar
send_message(), teste com mensagens longas (truncamento) - [ ]
format_text()deve lidar com markdown, emojis e caracteres especiais - [ ] Verifique se
send_typing_indicator()está implementado (UX) - [ ] Confira as env vars no
cloudbuild.yamlao adicionar novo adapter
Anterior: ← O Agente iFriend · Próximo: Sessão, Memória e Auth →