Plano de Implementação — Usage Tracking & Billing
Rastreamento de consumo de tokens e mensagens por usuário/empresa para sistema de cobrança.
Contexto
O agente iFriend precisa registrar consumo de tokens LLM e mensagens trocadas por usuário autenticado (JWT) para gerar cobranças. A arquitetura deve ser não-bloqueante e não impactar a performance de resposta do agente.
Recursos Existentes
| Recurso |
Estado |
| JWT |
Fornece user_id, email, company_id, roles, token_exp |
| Session |
CloudSQL MySQL, TTL 60min, id = {user_id}_{timestamp} |
| ADK Events |
Cada event expõe usage_metadata com token counts |
| GCP |
BigQuery já em uso (busca_produtos), Cloud SQL rodando |
{
"usage_metadata": {
"prompt_token_count": 9575,
"candidates_token_count": 14,
"cached_content_token_count": 8953,
"total_token_count": 9710,
"traffic_type": "ON_DEMAND"
}
}
Arquitetura
┌──────────────────────────────────────────────────────────────────────┐
│ HOT PATH (síncrono, leve) │
│ │
│ processor.py event loop │
│ │ │
│ ├─ Cada event com usage_metadata → acumula contadores locais │
│ │ (prompt_tokens, completion_tokens, cached_tokens) │
│ │ │
│ ├─ Cada event com content.role="user" → message_count_in += 1 │
│ ├─ Cada event com content.role="model" + texto → msg_out += 1 │
│ │ │
│ └─ Fim do loop → monta UsageRecord (dataclass) │
│ │ │
│ ▼ │
│ asyncio.create_task( │
│ publish_usage(record) ← fire-and-forget │
│ ) │
└──────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ COLD PATH (assíncrono, desacoplado) │
│ │
│ Pub/Sub → BigQuery (streaming insert via BQ Subscription) │
│ │
│ ┌────────────┐ ┌─────────────────┐ ┌──────────────────┐ │
│ │ Cloud │ │ BigQuery │ │ Views SQL para │ │
│ │ Pub/Sub │───▶│ Streaming Insert │───▶│ billing period │ │
│ │ topic: │ │ agent_usage_log │ │ (materialized) │ │
│ │ agent-usage │ └─────────────────┘ └──────────────────┘ │
│ └────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
Por que Pub/Sub + BigQuery (e não CloudSQL direto)
| Critério |
Pub/Sub + BigQuery |
CloudSQL direto |
| Impacto no hot path |
Zero (fire-and-forget publish) |
Mínimo (async INSERT) |
| Escalabilidade |
Ilimitada |
Limitada pela pool MySQL |
| Queries analíticas |
Nativo BQ (window functions, partitioning) |
MySQL 5.7 limitado |
| Custo |
~$0 (free tier cobre volume esperado) |
Usa conexões existentes |
| Complexidade |
Mais componentes |
Mais simples |
| Já usa na infra |
Sim (BigQuery para busca_produtos) |
Sim (CloudSQL para sessão) |
Componentes a Criar
ifriend_agent/
├── usage/
│ ├── __init__.py
│ ├── models.py # 1. UsageRecord dataclass
│ ├── collector.py # 2. Coleta tokens + mensagens do event loop
│ └── publisher.py # 3. Publica no Pub/Sub (fire-and-forget)
│
├── infra/
│ └── bigquery_usage.sql # 4. Schema BQ + views de billing
│
messaging/
├── processor.py # 5. Integração no event loop (acumula + dispara)
Etapa 1 — models.py (UsageRecord)
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import List, Optional
import uuid
import json
@dataclass
class UsageRecord:
"""Registro de consumo de uma interação (request/response) do agente."""
# Identificação
record_id: str = field(default_factory=lambda: str(uuid.uuid4()))
app_name: str = ""
user_id: str = ""
company_id: Optional[str] = None
session_id: str = ""
# Contadores de tokens (agregado por request)
prompt_tokens: int = 0
completion_tokens: int = 0
cached_tokens: int = 0
total_tokens: int = 0
# Contadores de mensagens
user_messages: int = 0 # mensagens enviadas pelo usuário neste turno
agent_messages: int = 0 # mensagens enviadas pelo agente neste turno
tool_calls: int = 0 # quantidade de tool calls executadas
# Metadados
model: str = "" # ex: "gemini-2.5-flash"
agents_used: List[str] = field(default_factory=list) # ["root_agent", "discovery_agent"]
platform: str = "" # ex: "whatsapp", "webchat", "slack"
# Custo estimado (calculado na publicação)
estimated_cost_usd: float = 0.0
# Timestamp
timestamp: datetime = field(default_factory=datetime.utcnow)
def to_json(self) -> str:
data = asdict(self)
data["timestamp"] = self.timestamp.isoformat()
return json.dumps(data)
Etapa 2 — collector.py (Coleta no Event Loop)
import os
from .models import UsageRecord
# Custo por 1M tokens (Gemini 2.5 Flash)
COST_PROMPT_PER_1M = float(os.getenv("GEMINI_COST_PROMPT_PER_1M", "0.075"))
COST_OUTPUT_PER_1M = float(os.getenv("GEMINI_COST_OUTPUT_PER_1M", "0.30"))
class UsageCollector:
"""
Acumula métricas durante o event loop do processor.
Uso:
collector = UsageCollector()
async for event in runner.run_async(...):
collector.track_event(event)
# ... resto do loop ...
record = collector.build_record(app_name, user_id, ...)
"""
def __init__(self):
self.prompt_tokens = 0
self.completion_tokens = 0
self.cached_tokens = 0
self.user_messages = 0
self.agent_messages = 0
self.tool_calls = 0
self.agents_seen = set()
self.model = None
def track_event(self, event):
"""Chamado para cada event no loop. Custo: microsegundos."""
# Token usage
if hasattr(event, "usage_metadata") and event.usage_metadata:
um = event.usage_metadata
self.prompt_tokens += getattr(um, "prompt_token_count", 0) or 0
self.completion_tokens += getattr(um, "candidates_token_count", 0) or 0
self.cached_tokens += getattr(um, "cached_content_token_count", 0) or 0
# Mensagens, tools e agentes
if event.content and hasattr(event.content, "parts"):
for part in event.content.parts:
if hasattr(part, "function_call") and part.function_call:
self.tool_calls += 1
if hasattr(part, "text") and part.text and event.author:
if event.author == "user":
self.user_messages += 1
else:
self.agent_messages += 1
# Rastreia agentes usados
if hasattr(event, "author") and event.author and event.author != "user":
self.agents_seen.add(event.author)
# Model
if hasattr(event, "model") and event.model:
self.model = event.model
def build_record(
self, app_name, user_id, company_id, session_id, platform, model_override=None
) -> UsageRecord:
"""Monta o registro final após o loop completo."""
total = self.prompt_tokens + self.completion_tokens
model = model_override or self.model or os.getenv("MODEL", "gemini-2.5-flash")
# Custo estimado
prompt_cost = (self.prompt_tokens / 1_000_000) * COST_PROMPT_PER_1M
output_cost = (self.completion_tokens / 1_000_000) * COST_OUTPUT_PER_1M
# Cached tokens custam ~75% menos
cached_discount = (self.cached_tokens / 1_000_000) * COST_PROMPT_PER_1M * 0.75
return UsageRecord(
app_name=app_name,
user_id=user_id,
company_id=company_id,
session_id=session_id,
prompt_tokens=self.prompt_tokens,
completion_tokens=self.completion_tokens,
cached_tokens=self.cached_tokens,
total_tokens=total,
user_messages=self.user_messages,
agent_messages=self.agent_messages,
tool_calls=self.tool_calls,
model=model,
agents_used=sorted(self.agents_seen),
platform=platform,
estimated_cost_usd=round(prompt_cost + output_cost - cached_discount, 6),
)
Etapa 3 — publisher.py (Pub/Sub Fire-and-Forget)
import os
import json
import logging
from google.cloud import pubsub_v1
from .models import UsageRecord
logger = logging.getLogger(__name__)
PROJECT_ID = os.getenv("GCP_PROJECT", os.getenv("GOOGLE_CLOUD_PROJECT", ""))
TOPIC_NAME = os.getenv("USAGE_PUBSUB_TOPIC", "agent-usage")
ENABLED = os.getenv("USAGE_TRACKING_ENABLED", "false").lower() == "true"
class UsagePublisher:
"""Publica UsageRecord no Cloud Pub/Sub (async, non-blocking)."""
def __init__(self):
self._publisher = None
self._topic_path = None
def _ensure_client(self):
if self._publisher is None:
self._publisher = pubsub_v1.PublisherClient()
self._topic_path = self._publisher.topic_path(PROJECT_ID, TOPIC_NAME)
async def publish(self, record: UsageRecord):
"""
Fire-and-forget: publica e não espera confirmação.
Falha silenciosa com log warning — nunca bloqueia o agent.
"""
if not ENABLED:
logger.debug("Usage tracking disabled, skipping publish")
return
try:
self._ensure_client()
data = record.to_json().encode("utf-8")
# publish() retorna Future — não fazemos .result() (fire-and-forget)
self._publisher.publish(
self._topic_path,
data,
user_id=record.user_id,
company_id=record.company_id or "",
platform=record.platform,
)
logger.debug(f"Usage record published: {record.record_id}")
except Exception as e:
logger.warning(f"Usage publish failed (non-blocking): {e}")
# Singleton
_publisher = UsagePublisher()
async def publish_usage(record: UsageRecord):
"""Função de conveniência para publicar usage."""
await _publisher.publish(record)
Etapa 4 — Integração no processor.py
Mudança mínima no event loop existente:
# No topo do arquivo:
import asyncio
import os
from ifriend_agent.usage.collector import UsageCollector
from ifriend_agent.usage.publisher import publish_usage
# Dentro de process_message(), ANTES do event loop existente:
collector = UsageCollector()
# DENTRO do event loop (1 linha adicionada):
async for event in self.runner.run_async(...):
collector.track_event(event) # ← ÚNICA LINHA NOVA NO LOOP
# ... todo o código existente permanece igual ...
# APÓS o event loop (após enviar resposta ao usuário):
if os.getenv("USAGE_TRACKING_ENABLED", "false").lower() == "true":
company_id = None
if jwt_context and jwt_context.custom_claims:
company_id = jwt_context.custom_claims.get("company_id")
record = collector.build_record(
app_name=self.runner.app_name,
user_id=message.user_id,
company_id=company_id,
session_id=message.thread_id,
platform=message.platform,
)
asyncio.create_task(publish_usage(record)) # fire-and-forget
Impacto no hot path: 1 chamada collector.track_event() por evento (~microsegundos). O publish roda em background após a resposta já ter sido enviada.
Etapa 5 — BigQuery Schema + Views
-- ===========================================================================
-- Tabela principal (append-only, particionada por dia, clusterizada)
-- ===========================================================================
CREATE TABLE IF NOT EXISTS `{PROJECT}.{DATASET}.agent_usage_log` (
record_id STRING NOT NULL,
app_name STRING,
user_id STRING NOT NULL,
company_id STRING,
session_id STRING NOT NULL,
-- Tokens
prompt_tokens INT64,
completion_tokens INT64,
cached_tokens INT64,
total_tokens INT64,
-- Mensagens
user_messages INT64,
agent_messages INT64,
tool_calls INT64,
-- Meta
model STRING,
agents_used ARRAY<STRING>,
platform STRING,
estimated_cost_usd FLOAT64,
timestamp TIMESTAMP NOT NULL
)
PARTITION BY DATE(timestamp)
CLUSTER BY company_id, user_id;
-- ===========================================================================
-- View: Consumo mensal por empresa (para billing)
-- ===========================================================================
CREATE OR REPLACE VIEW `{PROJECT}.{DATASET}.billing_monthly_company` AS
SELECT
company_id,
FORMAT_TIMESTAMP('%Y-%m', timestamp) AS billing_month,
COUNT(DISTINCT user_id) AS active_users,
COUNT(DISTINCT session_id) AS total_sessions,
SUM(user_messages) AS total_user_messages,
SUM(agent_messages) AS total_agent_messages,
SUM(user_messages + agent_messages) AS total_messages,
SUM(prompt_tokens) AS total_prompt_tokens,
SUM(completion_tokens) AS total_completion_tokens,
SUM(cached_tokens) AS total_cached_tokens,
SUM(total_tokens) AS total_tokens,
SUM(estimated_cost_usd) AS total_cost_usd
FROM `{PROJECT}.{DATASET}.agent_usage_log`
GROUP BY company_id, billing_month;
-- ===========================================================================
-- View: Consumo mensal por usuário (detalhe)
-- ===========================================================================
CREATE OR REPLACE VIEW `{PROJECT}.{DATASET}.billing_monthly_user` AS
SELECT
company_id,
user_id,
FORMAT_TIMESTAMP('%Y-%m', timestamp) AS billing_month,
COUNT(DISTINCT session_id) AS total_sessions,
SUM(user_messages) AS total_user_messages,
SUM(agent_messages) AS total_agent_messages,
SUM(user_messages + agent_messages) AS total_messages,
SUM(total_tokens) AS total_tokens,
SUM(estimated_cost_usd) AS total_cost_usd
FROM `{PROJECT}.{DATASET}.agent_usage_log`
GROUP BY company_id, user_id, billing_month;
-- ===========================================================================
-- View: Consumo diário (para dashboards)
-- ===========================================================================
CREATE OR REPLACE VIEW `{PROJECT}.{DATASET}.usage_daily` AS
SELECT
DATE(timestamp) AS usage_date,
company_id,
COUNT(DISTINCT user_id) AS active_users,
COUNT(DISTINCT session_id) AS sessions,
SUM(user_messages + agent_messages) AS messages,
SUM(total_tokens) AS tokens,
SUM(estimated_cost_usd) AS cost_usd
FROM `{PROJECT}.{DATASET}.agent_usage_log`
GROUP BY usage_date, company_id;
-- ===========================================================================
-- View: Consumo por plataforma (segmentação)
-- ===========================================================================
CREATE OR REPLACE VIEW `{PROJECT}.{DATASET}.usage_by_platform` AS
SELECT
platform,
FORMAT_TIMESTAMP('%Y-%m', timestamp) AS month,
COUNT(DISTINCT user_id) AS users,
SUM(user_messages + agent_messages) AS messages,
SUM(total_tokens) AS tokens,
SUM(estimated_cost_usd) AS cost_usd
FROM `{PROJECT}.{DATASET}.agent_usage_log`
GROUP BY platform, month;
Etapa 6 — Infra GCP (Pub/Sub + BQ Subscription)
# 1. Criar topic
gcloud pubsub topics create agent-usage \
--project=$GCP_PROJECT
# 2. Criar dataset no BigQuery (se não existir)
bq mk --dataset \
--description="Agent usage tracking for billing" \
--location=us-central1 \
$GCP_PROJECT:agent_usage
# 3. Criar tabela BigQuery (executar o SQL da Etapa 5)
bq query --use_legacy_sql=false < infra/bigquery_usage.sql
# 4. Criar schema Pub/Sub (para BQ subscription)
gcloud pubsub schemas create agent-usage-schema \
--type=avro \
--definition-file=infra/usage_schema.avsc
# 5. Criar BQ subscription (Pub/Sub escreve direto no BigQuery)
gcloud pubsub subscriptions create agent-usage-to-bq \
--topic=agent-usage \
--bigquery-table=$GCP_PROJECT:agent_usage.agent_usage_log \
--write-metadata \
--project=$GCP_PROJECT
# 6. Ativar feature flag
# Em produção: USAGE_TRACKING_ENABLED=true
Métricas Capturadas
| Métrica |
Fonte |
Uso para Billing |
user_messages |
Contagem de events role="user" com texto |
Cobrar por mensagem enviada |
agent_messages |
Contagem de events role="model" com texto |
Cobrar por resposta |
prompt_tokens |
event.usage_metadata.prompt_token_count |
Custo de input LLM |
completion_tokens |
event.usage_metadata.candidates_token_count |
Custo de output LLM |
cached_tokens |
event.usage_metadata.cached_content_token_count |
Desconto por cache (~75% mais barato) |
total_tokens |
prompt + completion |
Total de consumo LLM |
tool_calls |
Contagem de function_call parts |
Métrica de complexidade |
company_id |
JWT custom_claims.company_id |
Agrupar billing por empresa |
platform |
message.platform |
Segmentar por canal |
model |
event.model ou env MODEL |
Tabela de preço por modelo |
estimated_cost_usd |
Cálculo: tokens × preço/1M |
Estimativa de custo |
Variáveis de Ambiente
| Variável |
Default |
Descrição |
USAGE_TRACKING_ENABLED |
false |
Feature flag (ativar gradualmente) |
USAGE_PUBSUB_TOPIC |
agent-usage |
Topic do Cloud Pub/Sub |
GCP_PROJECT |
- |
Projeto GCP (já existente) |
GEMINI_COST_PROMPT_PER_1M |
0.075 |
Custo prompt tokens (USD/1M) |
GEMINI_COST_OUTPUT_PER_1M |
0.30 |
Custo output tokens (USD/1M) |
Ordem de Execução
| Fase |
O que |
Dependências |
Risco |
Estimativa |
| 1 |
usage/models.py + usage/collector.py |
Nenhuma |
Zero |
— |
| 2 |
usage/publisher.py |
Fase 1 |
Baixo |
— |
| 3 |
Integrar collector no processor.py |
Fases 1+2 |
Baixo (1 linha no loop) |
— |
| 4 |
Infra GCP: Pub/Sub topic + BigQuery schema |
Acesso GCP |
Zero (infra) |
— |
| 5 |
BigQuery Subscription (Pub/Sub → BQ) |
Fase 4 |
Zero (config) |
— |
| 6 |
Views SQL de billing |
Fase 4 |
Zero (read-only) |
— |
| 7 |
Testes unitários (collector + publisher mock) |
Fases 1-3 |
Zero |
— |
| 8 |
Deploy com USAGE_TRACKING_ENABLED=false |
Todas |
Zero |
— |
| 9 |
Ativar feature flag em staging |
Fase 8 |
Baixo |
— |
| 10 |
Ativar em produção |
Fase 9 validada |
Baixo |
— |
Exemplo de Query de Billing
-- Fatura mensal de uma empresa
SELECT
billing_month,
active_users,
total_sessions,
total_messages,
total_tokens,
ROUND(total_cost_usd, 2) AS cost_usd
FROM `project.agent_usage.billing_monthly_company`
WHERE company_id = '12345'
AND billing_month = '2026-03'
ORDER BY billing_month;
-- Top 10 usuários por consumo no mês
SELECT
user_id,
total_messages,
total_tokens,
ROUND(total_cost_usd, 4) AS cost_usd
FROM `project.agent_usage.billing_monthly_user`
WHERE company_id = '12345'
AND billing_month = '2026-03'
ORDER BY total_tokens DESC
LIMIT 10;
Garantias
- Zero impacto na latência: coleta em microsegundos, publicação fire-and-forget após resposta enviada
- Zero perda de dados: Pub/Sub garante at-least-once delivery ao BigQuery
- Feature flag:
USAGE_TRACKING_ENABLED=false por padrão, ativa gradualmente
- Fallback silencioso: se Pub/Sub falhar, apenas loga warning — agent continua funcionando
- Sem lock-in:
UsageRecord é um dataclass simples, publisher pode ser trocado (ex: CloudSQL, Redis, Kafka)