Skip to content

🧠 Plano de Integração: OpenMemory no ifriend-agents

Visão Geral

Integrar OpenMemory como Custom Memory Service substituindo Vertex AI Memory Bank.

Por que OpenMemory?

  • ✅ Open-source (Apache 2.0)
  • ✅ Auto-hospedado (zero vendor lock-in)
  • ✅ Multi-sector memory (semântico, episódico, procedural)
  • ✅ Temporal knowledge graph
  • ✅ 115ms queries (2-3x mais rápido que Zep)
  • ✅ $8-12/mês self-hosted vs $25-100+ cloud
  • ✅ Suporta embeddings locais (Ollama, E5, BGE)

Arquitetura da Integração

┌─────────────────────────────────────────┐
│ slack_bot.py (Cloud Run)                │
│ └─ ADK Runner + Agent                   │
└──────────────┬──────────────────────────┘
               │
        ┌──────┴──────┐
        │             │
        ▼             ▼
   ┌─────────┐  ┌────────────────────┐
   │Session  │  │ OpenMemoryService  │ ← NEW
   │Firestore│  │ (CustomMemoryService)
   │         │  │                    │
   └─────────┘  └────────┬───────────┘
                         │
                ┌────────┴────────┐
                │                 │
                ▼                 ▼
           ┌──────────────┐  ┌───────────┐
           │ OpenMemory   │  │ Embeddings│
           │ Backend      │  │ (Ollama)  │
           │ (HTTP)       │  │           │
           │ SQLite/PG    │  │ $0-15/mth │
           │ $8-12/mth    │  └───────────┘
           └──────────────┘

Fase 1: Setup Inicial (Semana 1)

1.1 Estrutura de Pastas

ifriend_agent/
├── memory/                          ← NEW
│   ├── __init__.py
│   ├── openmemory_service.py       ← Main integration
│   ├── models.py                   ← Data models
│   ├── client.py                   ← HTTP client
│   └── schemas.py                  ← Request/Response schemas
├── tools/
├── agents/
└── ...

1.2 OpenMemory Backend (Docker)

Criar docker-compose.yml na raiz:

version: '3.8'

services:
  openmemory:
    image: openmemory:latest  # Ou build local
    ports:
      - "8080:8080"
    environment:
      # Database
      OM_DATABASE_URL: sqlite:///data/openmemory.sqlite
      # OM_DATABASE_URL: postgresql://user:pass@postgres:5432/openmemory

      # Embeddings
      OM_EMBEDDINGS: ollama  # ou openai, gemini, aws
      OM_EMBEDDING_MODEL: nomic-embed-text
      OM_EMBEDDING_DIMENSION: 384

      # Ollama (if local)
      OLLAMA_URL: http://ollama:11434

      # Security
      OM_API_KEY: ${OM_API_KEY:-your-secret-key}

      # Optional: Telemetry
      OM_TELEMETRY: "true"

    volumes:
      - openmemory_data:/data

    depends_on:
      - ollama  # Optional

    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 10s
      timeout: 5s
      retries: 3

  ollama:  # Optional: local embeddings
    image: ollama/ollama:latest
    ports:
      - "11434:11434"
    volumes:
      - ollama_data:/root/.ollama
    environment:
      - OLLAMA_HOST=0.0.0.0:11434

volumes:
  openmemory_data:
  ollama_data:

1.3 .env Updates

# OpenMemory
OPENMEMORY_URL=http://localhost:8080
OPENMEMORY_API_KEY=your-secret-key
OPENMEMORY_EMBEDDINGS=ollama  # ou openai, gemini

# Ollama (if local)
OLLAMA_URL=http://localhost:11434

# Database (opcional, se usar PostgreSQL em produção)
OPENMEMORY_DATABASE_URL=postgresql://user:pass@host:5432/openmemory

Fase 2: Implementação Python SDK (Semana 1-2)

2.1 ifriend_agent/memory/schemas.py

from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Memory:
    """Single memory object from OpenMemory"""
    id: str
    content: str
    user_id: str
    sector: str  # semantic, episodic, procedural, emotional, reflective
    created_at: datetime
    updated_at: datetime
    salience: float  # 0-1, importance score
    recency: float  # 0-1, how recent
    tags: List[str] = None
    metadata: Dict[str, Any] = None

@dataclass
class MemoryQuery:
    """Request to query memories"""
    query: str
    user_id: str
    k: int = 5
    filters: Dict[str, Any] = None
    sectors: List[str] = None  # Filter by sectors

@dataclass
class MemoryAdd:
    """Request to add memory"""
    content: str
    user_id: str
    sector: str = "semantic"
    tags: List[str] = None
    metadata: Dict[str, Any] = None

@dataclass
class MemorySummary:
    """User memory summary from OpenMemory"""
    user_id: str
    total_memories: int
    sectors: Dict[str, int]  # count per sector
    last_updated: datetime
    summary: str

2.2 ifriend_agent/memory/client.py

import httpx
import os
from typing import List, Dict, Any, Optional
from datetime import datetime
import json

class OpenMemoryClient:
    """HTTP client for OpenMemory backend"""

    def __init__(
        self,
        base_url: str = None,
        api_key: str = None,
        timeout: float = 30.0
    ):
        self.base_url = base_url or os.getenv("OPENMEMORY_URL", "http://localhost:8080")
        self.api_key = api_key or os.getenv("OPENMEMORY_API_KEY")
        self.timeout = timeout
        self.client = httpx.AsyncClient(
            base_url=self.base_url,
            timeout=timeout,
            headers=self._get_headers()
        )

    def _get_headers(self) -> Dict[str, str]:
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json"
        }
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"
        return headers

    async def health_check(self) -> bool:
        """Check if OpenMemory server is healthy"""
        try:
            resp = await self.client.get("/health")
            return resp.status_code == 200
        except Exception as e:
            print(f"Health check failed: {e}")
            return False

    async def add_memory(
        self,
        content: str,
        user_id: str,
        sector: str = "semantic",
        tags: List[str] = None,
        metadata: Dict[str, Any] = None
    ) -> str:
        """Add a new memory"""
        payload = {
            "content": content,
            "user_id": user_id,
            "sector": sector,
            "tags": tags or [],
            "metadata": metadata or {}
        }
        resp = await self.client.post("/memory/add", json=payload)
        resp.raise_for_status()
        data = resp.json()
        return data.get("id")

    async def query_memories(
        self,
        query: str,
        user_id: str,
        k: int = 5,
        filters: Dict[str, Any] = None,
        sectors: List[str] = None
    ) -> List[Dict[str, Any]]:
        """Query memories by semantic similarity"""
        payload = {
            "query": query,
            "user_id": user_id,
            "k": k,
            "filters": filters or {},
        }
        if sectors:
            payload["sectors"] = sectors

        resp = await self.client.post("/memory/query", json=payload)
        resp.raise_for_status()
        data = resp.json()
        return data.get("memories", [])

    async def get_memory(self, memory_id: str, user_id: str) -> Optional[Dict]:
        """Get a specific memory"""
        resp = await self.client.get(
            f"/memory/{memory_id}",
            params={"user_id": user_id}
        )
        if resp.status_code == 404:
            return None
        resp.raise_for_status()
        return resp.json()

    async def reinforce_memory(
        self,
        memory_id: str,
        user_id: str,
        boost: float = 0.1
    ) -> bool:
        """Boost memory salience (strengthen memory)"""
        payload = {
            "user_id": user_id,
            "boost": boost
        }
        resp = await self.client.post(f"/memory/{memory_id}/reinforce", json=payload)
        resp.raise_for_status()
        return True

    async def delete_memory(self, memory_id: str, user_id: str) -> bool:
        """Delete a memory"""
        resp = await self.client.delete(
            f"/memory/{memory_id}",
            params={"user_id": user_id}
        )
        resp.raise_for_status()
        return True

    async def list_memories(
        self,
        user_id: str,
        limit: int = 20,
        offset: int = 0,
        sector: str = None
    ) -> List[Dict]:
        """List recent memories for user"""
        params = {
            "user_id": user_id,
            "limit": limit,
            "offset": offset
        }
        if sector:
            params["sector"] = sector

        resp = await self.client.get("/memory/list", params=params)
        resp.raise_for_status()
        data = resp.json()
        return data.get("memories", [])

    async def get_user_summary(self, user_id: str) -> Optional[Dict]:
        """Get user memory summary"""
        resp = await self.client.get(f"/users/{user_id}/summary")
        if resp.status_code == 404:
            return None
        resp.raise_for_status()
        return resp.json()

    async def close(self):
        """Close HTTP client"""
        await self.client.aclose()

2.3 ifriend_agent/memory/openmemory_service.py

import os
from typing import List, Dict, Any, Optional
from datetime import datetime
from google.adk.memory import BaseMemoryService
from google.genai import types
import uuid

from .client import OpenMemoryClient

class OpenMemoryService(BaseMemoryService):
    """Custom Memory Service integrating OpenMemory"""

    def __init__(
        self,
        openmemory_url: str = None,
        openmemory_api_key: str = None,
    ):
        self.client = OpenMemoryClient(
            base_url=openmemory_url,
            api_key=openmemory_api_key
        )

    async def add_session_to_memory(self, session) -> None:
        """Save session interactions to OpenMemory as episodic memories"""
        user_id = session.user_id
        session_id = session.id

        # Extract Q&A from session.events
        for i, event in enumerate(session.events or []):
            # Skip if not content
            if not event.content or not hasattr(event.content, 'parts'):
                continue

            # Extract text from event
            text_parts = [
                part.text for part in event.content.parts
                if hasattr(part, 'text') and part.text
            ]
            text = " ".join(text_parts)

            # Determine sector based on author
            if event.author == "user":
                sector = "episodic"  # User question
                role = "User"
            else:
                sector = "episodic"  # Agent response
                role = "Agent"

            # Create memory content
            memory_content = f"[{role}] {text[:500]}"  # Truncate to 500 chars

            # Store in OpenMemory
            try:
                await self.client.add_memory(
                    content=memory_content,
                    user_id=user_id,
                    sector=sector,
                    tags=["session", session_id],
                    metadata={
                        "session_id": session_id,
                        "author": event.author,
                        "timestamp": event.created_at.isoformat() if hasattr(event, 'created_at') else None
                    }
                )
            except Exception as e:
                print(f"Error adding memory: {e}")

    async def search_memory(
        self,
        query: str,
        user_id: str,
        k: int = 5,
        filters: Dict[str, Any] = None
    ) -> List[Dict[str, Any]]:
        """Search memories semantically"""
        try:
            results = await self.client.query_memories(
                query=query,
                user_id=user_id,
                k=k,
                filters=filters
            )
            return results
        except Exception as e:
            print(f"Error searching memory: {e}")
            return []

    async def get_summary(self, user_id: str) -> str:
        """Get user memory summary"""
        try:
            summary_data = await self.client.get_user_summary(user_id)
            if summary_data:
                return summary_data.get("summary", "")
            return ""
        except Exception as e:
            print(f"Error getting summary: {e}")
            return ""

    async def reinforce_memory(
        self,
        memory_id: str,
        user_id: str,
        boost: float = 0.1
    ) -> None:
        """Strengthen a memory (increase salience)"""
        try:
            await self.client.reinforce_memory(memory_id, user_id, boost)
        except Exception as e:
            print(f"Error reinforcing memory: {e}")

    async def delete_memory(self, memory_id: str, user_id: str) -> None:
        """Delete a memory"""
        try:
            await self.client.delete_memory(memory_id, user_id)
        except Exception as e:
            print(f"Error deleting memory: {e}")

    async def health_check(self) -> bool:
        """Check if OpenMemory backend is healthy"""
        return await self.client.health_check()

    async def close(self):
        """Cleanup resources"""
        await self.client.close()

Fase 3: Integração com ADK Agent (Semana 2)

3.1 Atualizar slack_bot.py

# Adicionar ao topo
from ifriend_agent.memory.openmemory_service import OpenMemoryService

# Substituir memory_service
memory_service = OpenMemoryService(
    openmemory_url=os.getenv("OPENMEMORY_URL"),
    openmemory_api_key=os.getenv("OPENMEMORY_API_KEY")
)

# Adicionar callback para salvar após conversa
async def auto_save_session_callback(callback_context):
    """Salvar sessão para OpenMemory após cada conversa"""
    try:
        session = callback_context._invocation_context.session
        await memory_service.add_session_to_memory(session)
    except Exception as e:
        logger.error(f"Error saving to OpenMemory: {e}")

root_agent.after_agent_callback = auto_save_session_callback

3.2 Usar memória na context do Agent

async def _handle_message(...):
    # ... existing code ...

    # Carregar contexto de memória ANTES de rodar agent
    memory_context = ""
    if user_id:
        try:
            memories = await memory_service.search_memory(
                query=user_content,  # ou extrair texto do content
                user_id=user_id,
                k=3
            )
            if memories:
                memory_context = "Contexto de memórias anteriores:\n"
                for mem in memories:
                    memory_context += f"- {mem.get('content', '')}\n"
        except Exception as e:
            logger.warning(f"Failed to load memories: {e}")

    # Adicionar memória ao user_content (system prompt update)
    # Isso vai depender de como o Agent aceita contexto

    # ... continue com runner.run_async() ...

Fase 4: Deploy e Monitoramento (Semana 2-3)

4.1 Docker Compose para Produção

version: '3.8'

services:
  openmemory:
    image: openmemory/openmemory:latest
    container_name: openmemory-prod
    environment:
      # Production database (PostgreSQL recomendado)
      OM_DATABASE_URL: postgresql://user:${DB_PASSWORD}@postgres:5432/openmemory

      # Embeddings (Ollama in separate container or cloud API)
      OM_EMBEDDINGS: ollama
      OLLAMA_URL: http://ollama:11434

      # Security
      OM_API_KEY: ${OM_API_KEY}
      OM_TELEMETRY: "false"  # Disable in production

    ports:
      - "8080:8080"

    volumes:
      - openmemory_data:/data

    depends_on:
      - postgres
      - ollama

    restart: always

    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  postgres:
    image: postgres:15-alpine
    container_name: openmemory-db
    environment:
      POSTGRES_USER: openmemory
      POSTGRES_PASSWORD: ${DB_PASSWORD}
      POSTGRES_DB: openmemory

    volumes:
      - postgres_data:/var/lib/postgresql/data

    restart: always

  ollama:
    image: ollama/ollama:latest
    container_name: openmemory-embeddings
    volumes:
      - ollama_data:/root/.ollama

    environment:
      - OLLAMA_HOST=0.0.0.0:11434

    restart: always

volumes:
  openmemory_data:
  postgres_data:
  ollama_data:

4.2 Kubernetes Deployment (Optional)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: openmemory
  namespace: ifriend
spec:
  replicas: 1
  selector:
    matchLabels:
      app: openmemory
  template:
    metadata:
      labels:
        app: openmemory
    spec:
      containers:
      - name: openmemory
        image: openmemory/openmemory:latest
        ports:
        - containerPort: 8080
        env:
        - name: OM_DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: openmemory-secrets
              key: database-url
        - name: OM_API_KEY
          valueFrom:
            secretKeyRef:
              name: openmemory-secrets
              key: api-key
        - name: OLLAMA_URL
          value: "http://ollama:11434"
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: openmemory
  namespace: ifriend
spec:
  selector:
    app: openmemory
  ports:
  - protocol: TCP
    port: 8080
    targetPort: 8080
  type: ClusterIP

Fase 5: Testes (Semana 2)

5.1 Tests Unitários

# tests/test_openmemory_service.py

import pytest
from ifriend_agent.memory.openmemory_service import OpenMemoryService

@pytest.mark.asyncio
async def test_health_check():
    service = OpenMemoryService()
    assert await service.health_check()

@pytest.mark.asyncio
async def test_add_memory():
    service = OpenMemoryService()
    user_id = "test_user_123"

    memory_id = await service.client.add_memory(
        content="Test memory content",
        user_id=user_id,
        sector="semantic"
    )

    assert memory_id is not None

@pytest.mark.asyncio
async def test_query_memory():
    service = OpenMemoryService()
    user_id = "test_user_123"

    # Add first
    await service.client.add_memory(
        content="The user likes dark mode",
        user_id=user_id
    )

    # Query
    results = await service.search_memory(
        query="user preferences",
        user_id=user_id,
        k=1
    )

    assert len(results) > 0

5.2 Integration Tests

# tests/test_integration.py

@pytest.mark.asyncio
async def test_session_to_memory_flow():
    """Test full flow: session → memory storage → query"""
    service = OpenMemoryService()

    # Simular session com eventos
    session = MockSession(
        user_id="user_123",
        id="session_abc",
        events=[
            Event(author="user", content="Como integrar OpenMemory?"),
            Event(author="agent", content="OpenMemory é um sistema de memória...")
        ]
    )

    # Salvar para memória
    await service.add_session_to_memory(session)

    # Verificar que foi salvo
    memories = await service.search_memory(
        query="OpenMemory integration",
        user_id="user_123"
    )

    assert len(memories) > 0

Fase 6: Monitoring e Otimizações (Semana 3+)

6.1 Métricas

# ifriend_agent/memory/metrics.py

import time
from typing import Callable
import functools

class MemoryMetrics:
    def __init__(self):
        self.query_times = []
        self.add_times = []
        self.errors = 0

    def record_query_time(self, duration_ms: float):
        self.query_times.append(duration_ms)

    def get_avg_query_time(self) -> float:
        if not self.query_times:
            return 0
        return sum(self.query_times) / len(self.query_times)

    def record_error(self):
        self.errors += 1

def timed_operation(metrics: MemoryMetrics):
    """Decorator para medir performance"""
    def decorator(func: Callable):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.time()
            try:
                result = await func(*args, **kwargs)
                duration_ms = (time.time() - start) * 1000

                if 'query' in func.__name__:
                    metrics.record_query_time(duration_ms)

                return result
            except Exception as e:
                metrics.record_error()
                raise
        return wrapper
    return decorator

6.2 Dashboard Monitoring

OpenMemory já inclui dashboard! Acessar em:

http://localhost:8080/dashboard

Features: - Visualizar all memories por sector - Ver decay over time - Monitor performance - Manage users and summaries


Fase 7: Otimizações Avançadas (Semana 4+)

7.1 Multi-Sector Memory

# Usar diferentes sectors para diferentes tipos

# Semântico: facts e knowledge
await service.client.add_memory(
    content="OpenMemory supports Ollama for embeddings",
    user_id="user_123",
    sector="semantic"
)

# Episódico: events e conversas
await service.client.add_memory(
    content="User asked about OpenMemory integration on 2025-11-23",
    user_id="user_123",
    sector="episodic"
)

# Procedural: como fazer coisas
await service.client.add_memory(
    content="To integrate OpenMemory: 1. Setup backend 2. Create service 3. Add to agent",
    user_id="user_123",
    sector="procedural"
)

# Emotional: sentimentos e preferências
await service.client.add_memory(
    content="User seems interested in vendor-independent solutions",
    user_id="user_123",
    sector="emotional"
)

7.2 Temporal Knowledge Graph

# Usar OpenMemory temporal features para fact evolution

POST /api/temporal/fact
{
  "subject": "user_123",
  "predicate": "prefers_embedding_model",
  "object": "nomic-embed-text",
  "valid_from": "2025-11-23",
  "confidence": 0.95
}

# Query: o que o user preferia em uma data específica?
GET /api/temporal/fact?subject=user_123&predicate=prefers_embedding_model&at=2025-11-20

Timeline de Implementação

Semana 1:
├─ Dia 1-2: Setup pastas + estrutura
├─ Dia 3-4: Implementar client.py + schemas.py
├─ Dia 5-7: Implementar openmemory_service.py + testes básicos

Semana 2:
├─ Dia 1-2: Integrar com slack_bot.py
├─ Dia 3-4: Integration tests + debugging
├─ Dia 5-7: Deploy local + testes E2E

Semana 3:
├─ Dia 1-2: Setup produção (PostgreSQL + K8s)
├─ Dia 3-4: Monitoring + dashboards
├─ Dia 5-7: Performance optimization

Semana 4+:
├─ Multi-sector memory strategies
├─ Temporal knowledge graph
└─ Advanced features

Comparação: Antes vs Depois

Antes (Vertex AI Memory Bank)

Custo: $50-100/mês
Latência: 300-500ms
Vendor lock-in: FORTE
Data ownership: Google's
Setup: Complexo (ADK specifics)

Depois (OpenMemory + Ollama)

Custo: $8-12/mês self-hosted (+ $5-15 Ollama)
Latência: 115ms (2-3x faster)
Vendor lock-in: NENHUM
Data ownership: 100% seu
Setup: Simples (HTTP API)

Total Economia

  • Custo: 80% redução
  • Performance: 2-3x mais rápido
  • Controle: Total sobre infraestrutura

Próximas Ações

  1. ✅ Ler este plano
  2. ⏭️ Criar pasta ifriend_agent/memory/
  3. ⏭️ Implementar client.py
  4. ⏭️ Implementar openmemory_service.py
  5. ⏭️ Integrar com slack_bot.py
  6. ⏭️ Testar localmente
  7. ⏭️ Deploy em produção

Referências

  • OpenMemory GitHub: https://github.com/CaviraOSS/OpenMemory
  • OpenMemory Docs: https://openmemory.cavira.app
  • OpenMemory Discord: https://discord.gg/P7HaRayqTh

Status: Ready to implement ✅