Skip to content

Plano de Implementação — Usage Tracking & Billing

Rastreamento de consumo de tokens e mensagens por usuário/empresa para sistema de cobrança.

Contexto

O agente iFriend precisa registrar consumo de tokens LLM e mensagens trocadas por usuário autenticado (JWT) para gerar cobranças. A arquitetura deve ser não-bloqueante e não impactar a performance de resposta do agente.

Recursos Existentes

Recurso Estado
JWT Fornece user_id, email, company_id, roles, token_exp
Session CloudSQL MySQL, TTL 60min, id = {user_id}_{timestamp}
ADK Events Cada event expõe usage_metadata com token counts
GCP BigQuery já em uso (busca_produtos), Cloud SQL rodando

ADK usage_metadata (já disponível em cada event)

{
  "usage_metadata": {
    "prompt_token_count": 9575,
    "candidates_token_count": 14,
    "cached_content_token_count": 8953,
    "total_token_count": 9710,
    "traffic_type": "ON_DEMAND"
  }
}

Arquitetura

┌──────────────────────────────────────────────────────────────────────┐
│                         HOT PATH (síncrono, leve)                   │
│                                                                      │
│  processor.py event loop                                             │
│      │                                                               │
│      ├─ Cada event com usage_metadata → acumula contadores locais   │
│      │  (prompt_tokens, completion_tokens, cached_tokens)            │
│      │                                                               │
│      ├─ Cada event com content.role="user" → message_count_in += 1  │
│      ├─ Cada event com content.role="model" + texto → msg_out += 1  │
│      │                                                               │
│      └─ Fim do loop → monta UsageRecord (dataclass)                 │
│                            │                                         │
│                            ▼                                         │
│                    asyncio.create_task(                               │
│                        publish_usage(record)   ← fire-and-forget    │
│                    )                                                  │
└──────────────────────────────────────────────────────────────────────┘
                             │
                             ▼
┌──────────────────────────────────────────────────────────────────────┐
│                     COLD PATH (assíncrono, desacoplado)              │
│                                                                      │
│  Pub/Sub → BigQuery (streaming insert via BQ Subscription)          │
│                                                                      │
│  ┌────────────┐    ┌─────────────────┐    ┌──────────────────┐      │
│  │ Cloud       │    │ BigQuery         │    │ Views SQL para   │      │
│  │ Pub/Sub     │───▶│ Streaming Insert │───▶│ billing period   │      │
│  │ topic:      │    │ agent_usage_log  │    │ (materialized)   │      │
│  │ agent-usage │    └─────────────────┘    └──────────────────┘      │
│  └────────────┘                                                      │
└──────────────────────────────────────────────────────────────────────┘

Por que Pub/Sub + BigQuery (e não CloudSQL direto)

Critério Pub/Sub + BigQuery CloudSQL direto
Impacto no hot path Zero (fire-and-forget publish) Mínimo (async INSERT)
Escalabilidade Ilimitada Limitada pela pool MySQL
Queries analíticas Nativo BQ (window functions, partitioning) MySQL 5.7 limitado
Custo ~$0 (free tier cobre volume esperado) Usa conexões existentes
Complexidade Mais componentes Mais simples
Já usa na infra Sim (BigQuery para busca_produtos) Sim (CloudSQL para sessão)

Componentes a Criar

ifriend_agent/
├── usage/
│   ├── __init__.py
│   ├── models.py              # 1. UsageRecord dataclass
│   ├── collector.py           # 2. Coleta tokens + mensagens do event loop
│   └── publisher.py           # 3. Publica no Pub/Sub (fire-and-forget)
│
├── infra/
│   └── bigquery_usage.sql     # 4. Schema BQ + views de billing
│
messaging/
├── processor.py               # 5. Integração no event loop (acumula + dispara)

Etapa 1 — models.py (UsageRecord)

from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import List, Optional
import uuid
import json


@dataclass
class UsageRecord:
    """Registro de consumo de uma interação (request/response) do agente."""

    # Identificação
    record_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    app_name: str = ""
    user_id: str = ""
    company_id: Optional[str] = None
    session_id: str = ""

    # Contadores de tokens (agregado por request)
    prompt_tokens: int = 0
    completion_tokens: int = 0
    cached_tokens: int = 0
    total_tokens: int = 0

    # Contadores de mensagens
    user_messages: int = 0        # mensagens enviadas pelo usuário neste turno
    agent_messages: int = 0       # mensagens enviadas pelo agente neste turno
    tool_calls: int = 0           # quantidade de tool calls executadas

    # Metadados
    model: str = ""               # ex: "gemini-2.5-flash"
    agents_used: List[str] = field(default_factory=list)  # ["root_agent", "discovery_agent"]
    platform: str = ""            # ex: "whatsapp", "webchat", "slack"

    # Custo estimado (calculado na publicação)
    estimated_cost_usd: float = 0.0

    # Timestamp
    timestamp: datetime = field(default_factory=datetime.utcnow)

    def to_json(self) -> str:
        data = asdict(self)
        data["timestamp"] = self.timestamp.isoformat()
        return json.dumps(data)

Etapa 2 — collector.py (Coleta no Event Loop)

import os
from .models import UsageRecord


# Custo por 1M tokens (Gemini 2.5 Flash)
COST_PROMPT_PER_1M = float(os.getenv("GEMINI_COST_PROMPT_PER_1M", "0.075"))
COST_OUTPUT_PER_1M = float(os.getenv("GEMINI_COST_OUTPUT_PER_1M", "0.30"))


class UsageCollector:
    """
    Acumula métricas durante o event loop do processor.

    Uso:
        collector = UsageCollector()
        async for event in runner.run_async(...):
            collector.track_event(event)
            # ... resto do loop ...
        record = collector.build_record(app_name, user_id, ...)
    """

    def __init__(self):
        self.prompt_tokens = 0
        self.completion_tokens = 0
        self.cached_tokens = 0
        self.user_messages = 0
        self.agent_messages = 0
        self.tool_calls = 0
        self.agents_seen = set()
        self.model = None

    def track_event(self, event):
        """Chamado para cada event no loop. Custo: microsegundos."""
        # Token usage
        if hasattr(event, "usage_metadata") and event.usage_metadata:
            um = event.usage_metadata
            self.prompt_tokens += getattr(um, "prompt_token_count", 0) or 0
            self.completion_tokens += getattr(um, "candidates_token_count", 0) or 0
            self.cached_tokens += getattr(um, "cached_content_token_count", 0) or 0

        # Mensagens, tools e agentes
        if event.content and hasattr(event.content, "parts"):
            for part in event.content.parts:
                if hasattr(part, "function_call") and part.function_call:
                    self.tool_calls += 1
                if hasattr(part, "text") and part.text and event.author:
                    if event.author == "user":
                        self.user_messages += 1
                    else:
                        self.agent_messages += 1

        # Rastreia agentes usados
        if hasattr(event, "author") and event.author and event.author != "user":
            self.agents_seen.add(event.author)

        # Model
        if hasattr(event, "model") and event.model:
            self.model = event.model

    def build_record(
        self, app_name, user_id, company_id, session_id, platform, model_override=None
    ) -> UsageRecord:
        """Monta o registro final após o loop completo."""
        total = self.prompt_tokens + self.completion_tokens
        model = model_override or self.model or os.getenv("MODEL", "gemini-2.5-flash")

        # Custo estimado
        prompt_cost = (self.prompt_tokens / 1_000_000) * COST_PROMPT_PER_1M
        output_cost = (self.completion_tokens / 1_000_000) * COST_OUTPUT_PER_1M
        # Cached tokens custam ~75% menos
        cached_discount = (self.cached_tokens / 1_000_000) * COST_PROMPT_PER_1M * 0.75

        return UsageRecord(
            app_name=app_name,
            user_id=user_id,
            company_id=company_id,
            session_id=session_id,
            prompt_tokens=self.prompt_tokens,
            completion_tokens=self.completion_tokens,
            cached_tokens=self.cached_tokens,
            total_tokens=total,
            user_messages=self.user_messages,
            agent_messages=self.agent_messages,
            tool_calls=self.tool_calls,
            model=model,
            agents_used=sorted(self.agents_seen),
            platform=platform,
            estimated_cost_usd=round(prompt_cost + output_cost - cached_discount, 6),
        )

Etapa 3 — publisher.py (Pub/Sub Fire-and-Forget)

import os
import json
import logging
from google.cloud import pubsub_v1
from .models import UsageRecord

logger = logging.getLogger(__name__)

PROJECT_ID = os.getenv("GCP_PROJECT", os.getenv("GOOGLE_CLOUD_PROJECT", ""))
TOPIC_NAME = os.getenv("USAGE_PUBSUB_TOPIC", "agent-usage")
ENABLED = os.getenv("USAGE_TRACKING_ENABLED", "false").lower() == "true"


class UsagePublisher:
    """Publica UsageRecord no Cloud Pub/Sub (async, non-blocking)."""

    def __init__(self):
        self._publisher = None
        self._topic_path = None

    def _ensure_client(self):
        if self._publisher is None:
            self._publisher = pubsub_v1.PublisherClient()
            self._topic_path = self._publisher.topic_path(PROJECT_ID, TOPIC_NAME)

    async def publish(self, record: UsageRecord):
        """
        Fire-and-forget: publica e não espera confirmação.
        Falha silenciosa com log warning — nunca bloqueia o agent.
        """
        if not ENABLED:
            logger.debug("Usage tracking disabled, skipping publish")
            return

        try:
            self._ensure_client()
            data = record.to_json().encode("utf-8")
            # publish() retorna Future — não fazemos .result() (fire-and-forget)
            self._publisher.publish(
                self._topic_path,
                data,
                user_id=record.user_id,
                company_id=record.company_id or "",
                platform=record.platform,
            )
            logger.debug(f"Usage record published: {record.record_id}")
        except Exception as e:
            logger.warning(f"Usage publish failed (non-blocking): {e}")


# Singleton
_publisher = UsagePublisher()


async def publish_usage(record: UsageRecord):
    """Função de conveniência para publicar usage."""
    await _publisher.publish(record)

Etapa 4 — Integração no processor.py

Mudança mínima no event loop existente:

# No topo do arquivo:
import asyncio
import os
from ifriend_agent.usage.collector import UsageCollector
from ifriend_agent.usage.publisher import publish_usage

# Dentro de process_message(), ANTES do event loop existente:
collector = UsageCollector()

# DENTRO do event loop (1 linha adicionada):
async for event in self.runner.run_async(...):
    collector.track_event(event)       # ← ÚNICA LINHA NOVA NO LOOP
    # ... todo o código existente permanece igual ...

# APÓS o event loop (após enviar resposta ao usuário):
if os.getenv("USAGE_TRACKING_ENABLED", "false").lower() == "true":
    company_id = None
    if jwt_context and jwt_context.custom_claims:
        company_id = jwt_context.custom_claims.get("company_id")

    record = collector.build_record(
        app_name=self.runner.app_name,
        user_id=message.user_id,
        company_id=company_id,
        session_id=message.thread_id,
        platform=message.platform,
    )
    asyncio.create_task(publish_usage(record))  # fire-and-forget

Impacto no hot path: 1 chamada collector.track_event() por evento (~microsegundos). O publish roda em background após a resposta já ter sido enviada.

Etapa 5 — BigQuery Schema + Views

-- ===========================================================================
-- Tabela principal (append-only, particionada por dia, clusterizada)
-- ===========================================================================
CREATE TABLE IF NOT EXISTS `{PROJECT}.{DATASET}.agent_usage_log` (
  record_id STRING NOT NULL,
  app_name STRING,
  user_id STRING NOT NULL,
  company_id STRING,
  session_id STRING NOT NULL,

  -- Tokens
  prompt_tokens INT64,
  completion_tokens INT64,
  cached_tokens INT64,
  total_tokens INT64,

  -- Mensagens
  user_messages INT64,
  agent_messages INT64,
  tool_calls INT64,

  -- Meta
  model STRING,
  agents_used ARRAY<STRING>,
  platform STRING,
  estimated_cost_usd FLOAT64,

  timestamp TIMESTAMP NOT NULL
)
PARTITION BY DATE(timestamp)
CLUSTER BY company_id, user_id;


-- ===========================================================================
-- View: Consumo mensal por empresa (para billing)
-- ===========================================================================
CREATE OR REPLACE VIEW `{PROJECT}.{DATASET}.billing_monthly_company` AS
SELECT
  company_id,
  FORMAT_TIMESTAMP('%Y-%m', timestamp) AS billing_month,
  COUNT(DISTINCT user_id) AS active_users,
  COUNT(DISTINCT session_id) AS total_sessions,
  SUM(user_messages) AS total_user_messages,
  SUM(agent_messages) AS total_agent_messages,
  SUM(user_messages + agent_messages) AS total_messages,
  SUM(prompt_tokens) AS total_prompt_tokens,
  SUM(completion_tokens) AS total_completion_tokens,
  SUM(cached_tokens) AS total_cached_tokens,
  SUM(total_tokens) AS total_tokens,
  SUM(estimated_cost_usd) AS total_cost_usd
FROM `{PROJECT}.{DATASET}.agent_usage_log`
GROUP BY company_id, billing_month;


-- ===========================================================================
-- View: Consumo mensal por usuário (detalhe)
-- ===========================================================================
CREATE OR REPLACE VIEW `{PROJECT}.{DATASET}.billing_monthly_user` AS
SELECT
  company_id,
  user_id,
  FORMAT_TIMESTAMP('%Y-%m', timestamp) AS billing_month,
  COUNT(DISTINCT session_id) AS total_sessions,
  SUM(user_messages) AS total_user_messages,
  SUM(agent_messages) AS total_agent_messages,
  SUM(user_messages + agent_messages) AS total_messages,
  SUM(total_tokens) AS total_tokens,
  SUM(estimated_cost_usd) AS total_cost_usd
FROM `{PROJECT}.{DATASET}.agent_usage_log`
GROUP BY company_id, user_id, billing_month;


-- ===========================================================================
-- View: Consumo diário (para dashboards)
-- ===========================================================================
CREATE OR REPLACE VIEW `{PROJECT}.{DATASET}.usage_daily` AS
SELECT
  DATE(timestamp) AS usage_date,
  company_id,
  COUNT(DISTINCT user_id) AS active_users,
  COUNT(DISTINCT session_id) AS sessions,
  SUM(user_messages + agent_messages) AS messages,
  SUM(total_tokens) AS tokens,
  SUM(estimated_cost_usd) AS cost_usd
FROM `{PROJECT}.{DATASET}.agent_usage_log`
GROUP BY usage_date, company_id;


-- ===========================================================================
-- View: Consumo por plataforma (segmentação)
-- ===========================================================================
CREATE OR REPLACE VIEW `{PROJECT}.{DATASET}.usage_by_platform` AS
SELECT
  platform,
  FORMAT_TIMESTAMP('%Y-%m', timestamp) AS month,
  COUNT(DISTINCT user_id) AS users,
  SUM(user_messages + agent_messages) AS messages,
  SUM(total_tokens) AS tokens,
  SUM(estimated_cost_usd) AS cost_usd
FROM `{PROJECT}.{DATASET}.agent_usage_log`
GROUP BY platform, month;

Etapa 6 — Infra GCP (Pub/Sub + BQ Subscription)

# 1. Criar topic
gcloud pubsub topics create agent-usage \
  --project=$GCP_PROJECT

# 2. Criar dataset no BigQuery (se não existir)
bq mk --dataset \
  --description="Agent usage tracking for billing" \
  --location=us-central1 \
  $GCP_PROJECT:agent_usage

# 3. Criar tabela BigQuery (executar o SQL da Etapa 5)
bq query --use_legacy_sql=false < infra/bigquery_usage.sql

# 4. Criar schema Pub/Sub (para BQ subscription)
gcloud pubsub schemas create agent-usage-schema \
  --type=avro \
  --definition-file=infra/usage_schema.avsc

# 5. Criar BQ subscription (Pub/Sub escreve direto no BigQuery)
gcloud pubsub subscriptions create agent-usage-to-bq \
  --topic=agent-usage \
  --bigquery-table=$GCP_PROJECT:agent_usage.agent_usage_log \
  --write-metadata \
  --project=$GCP_PROJECT

# 6. Ativar feature flag
# Em produção: USAGE_TRACKING_ENABLED=true

Métricas Capturadas

Métrica Fonte Uso para Billing
user_messages Contagem de events role="user" com texto Cobrar por mensagem enviada
agent_messages Contagem de events role="model" com texto Cobrar por resposta
prompt_tokens event.usage_metadata.prompt_token_count Custo de input LLM
completion_tokens event.usage_metadata.candidates_token_count Custo de output LLM
cached_tokens event.usage_metadata.cached_content_token_count Desconto por cache (~75% mais barato)
total_tokens prompt + completion Total de consumo LLM
tool_calls Contagem de function_call parts Métrica de complexidade
company_id JWT custom_claims.company_id Agrupar billing por empresa
platform message.platform Segmentar por canal
model event.model ou env MODEL Tabela de preço por modelo
estimated_cost_usd Cálculo: tokens × preço/1M Estimativa de custo

Variáveis de Ambiente

Variável Default Descrição
USAGE_TRACKING_ENABLED false Feature flag (ativar gradualmente)
USAGE_PUBSUB_TOPIC agent-usage Topic do Cloud Pub/Sub
GCP_PROJECT - Projeto GCP (já existente)
GEMINI_COST_PROMPT_PER_1M 0.075 Custo prompt tokens (USD/1M)
GEMINI_COST_OUTPUT_PER_1M 0.30 Custo output tokens (USD/1M)

Ordem de Execução

Fase O que Dependências Risco Estimativa
1 usage/models.py + usage/collector.py Nenhuma Zero
2 usage/publisher.py Fase 1 Baixo
3 Integrar collector no processor.py Fases 1+2 Baixo (1 linha no loop)
4 Infra GCP: Pub/Sub topic + BigQuery schema Acesso GCP Zero (infra)
5 BigQuery Subscription (Pub/Sub → BQ) Fase 4 Zero (config)
6 Views SQL de billing Fase 4 Zero (read-only)
7 Testes unitários (collector + publisher mock) Fases 1-3 Zero
8 Deploy com USAGE_TRACKING_ENABLED=false Todas Zero
9 Ativar feature flag em staging Fase 8 Baixo
10 Ativar em produção Fase 9 validada Baixo

Exemplo de Query de Billing

-- Fatura mensal de uma empresa
SELECT
  billing_month,
  active_users,
  total_sessions,
  total_messages,
  total_tokens,
  ROUND(total_cost_usd, 2) AS cost_usd
FROM `project.agent_usage.billing_monthly_company`
WHERE company_id = '12345'
  AND billing_month = '2026-03'
ORDER BY billing_month;

-- Top 10 usuários por consumo no mês
SELECT
  user_id,
  total_messages,
  total_tokens,
  ROUND(total_cost_usd, 4) AS cost_usd
FROM `project.agent_usage.billing_monthly_user`
WHERE company_id = '12345'
  AND billing_month = '2026-03'
ORDER BY total_tokens DESC
LIMIT 10;

Garantias

  • Zero impacto na latência: coleta em microsegundos, publicação fire-and-forget após resposta enviada
  • Zero perda de dados: Pub/Sub garante at-least-once delivery ao BigQuery
  • Feature flag: USAGE_TRACKING_ENABLED=false por padrão, ativa gradualmente
  • Fallback silencioso: se Pub/Sub falhar, apenas loga warning — agent continua funcionando
  • Sem lock-in: UsageRecord é um dataclass simples, publisher pode ser trocado (ex: CloudSQL, Redis, Kafka)