🧠 Plano de Integração: OpenMemory no ifriend-agents¶
Visão Geral¶
Integrar OpenMemory como Custom Memory Service substituindo Vertex AI Memory Bank.
Por que OpenMemory?¶
- ✅ Open-source (Apache 2.0)
- ✅ Auto-hospedado (zero vendor lock-in)
- ✅ Multi-sector memory (semântico, episódico, procedural)
- ✅ Temporal knowledge graph
- ✅ 115ms queries (2-3x mais rápido que Zep)
- ✅ $8-12/mês self-hosted vs $25-100+ cloud
- ✅ Suporta embeddings locais (Ollama, E5, BGE)
Arquitetura da Integração¶
┌─────────────────────────────────────────┐
│ slack_bot.py (Cloud Run) │
│ └─ ADK Runner + Agent │
└──────────────┬──────────────────────────┘
│
┌──────┴──────┐
│ │
▼ ▼
┌─────────┐ ┌────────────────────┐
│Session │ │ OpenMemoryService │ ← NEW
│Firestore│ │ (CustomMemoryService)
│ │ │ │
└─────────┘ └────────┬───────────┘
│
┌────────┴────────┐
│ │
▼ ▼
┌──────────────┐ ┌───────────┐
│ OpenMemory │ │ Embeddings│
│ Backend │ │ (Ollama) │
│ (HTTP) │ │ │
│ SQLite/PG │ │ $0-15/mth │
│ $8-12/mth │ └───────────┘
└──────────────┘
Fase 1: Setup Inicial (Semana 1)¶
1.1 Estrutura de Pastas¶
ifriend_agent/
├── memory/ ← NEW
│ ├── __init__.py
│ ├── openmemory_service.py ← Main integration
│ ├── models.py ← Data models
│ ├── client.py ← HTTP client
│ └── schemas.py ← Request/Response schemas
├── tools/
├── agents/
└── ...
1.2 OpenMemory Backend (Docker)¶
Criar docker-compose.yml na raiz:
version: '3.8'
services:
openmemory:
image: openmemory:latest # Ou build local
ports:
- "8080:8080"
environment:
# Database
OM_DATABASE_URL: sqlite:///data/openmemory.sqlite
# OM_DATABASE_URL: postgresql://user:pass@postgres:5432/openmemory
# Embeddings
OM_EMBEDDINGS: ollama # ou openai, gemini, aws
OM_EMBEDDING_MODEL: nomic-embed-text
OM_EMBEDDING_DIMENSION: 384
# Ollama (if local)
OLLAMA_URL: http://ollama:11434
# Security
OM_API_KEY: ${OM_API_KEY:-your-secret-key}
# Optional: Telemetry
OM_TELEMETRY: "true"
volumes:
- openmemory_data:/data
depends_on:
- ollama # Optional
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 10s
timeout: 5s
retries: 3
ollama: # Optional: local embeddings
image: ollama/ollama:latest
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
environment:
- OLLAMA_HOST=0.0.0.0:11434
volumes:
openmemory_data:
ollama_data:
1.3 .env Updates¶
# OpenMemory
OPENMEMORY_URL=http://localhost:8080
OPENMEMORY_API_KEY=your-secret-key
OPENMEMORY_EMBEDDINGS=ollama # ou openai, gemini
# Ollama (if local)
OLLAMA_URL=http://localhost:11434
# Database (opcional, se usar PostgreSQL em produção)
OPENMEMORY_DATABASE_URL=postgresql://user:pass@host:5432/openmemory
Fase 2: Implementação Python SDK (Semana 1-2)¶
2.1 ifriend_agent/memory/schemas.py¶
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Memory:
"""Single memory object from OpenMemory"""
id: str
content: str
user_id: str
sector: str # semantic, episodic, procedural, emotional, reflective
created_at: datetime
updated_at: datetime
salience: float # 0-1, importance score
recency: float # 0-1, how recent
tags: List[str] = None
metadata: Dict[str, Any] = None
@dataclass
class MemoryQuery:
"""Request to query memories"""
query: str
user_id: str
k: int = 5
filters: Dict[str, Any] = None
sectors: List[str] = None # Filter by sectors
@dataclass
class MemoryAdd:
"""Request to add memory"""
content: str
user_id: str
sector: str = "semantic"
tags: List[str] = None
metadata: Dict[str, Any] = None
@dataclass
class MemorySummary:
"""User memory summary from OpenMemory"""
user_id: str
total_memories: int
sectors: Dict[str, int] # count per sector
last_updated: datetime
summary: str
2.2 ifriend_agent/memory/client.py¶
import httpx
import os
from typing import List, Dict, Any, Optional
from datetime import datetime
import json
class OpenMemoryClient:
"""HTTP client for OpenMemory backend"""
def __init__(
self,
base_url: str = None,
api_key: str = None,
timeout: float = 30.0
):
self.base_url = base_url or os.getenv("OPENMEMORY_URL", "http://localhost:8080")
self.api_key = api_key or os.getenv("OPENMEMORY_API_KEY")
self.timeout = timeout
self.client = httpx.AsyncClient(
base_url=self.base_url,
timeout=timeout,
headers=self._get_headers()
)
def _get_headers(self) -> Dict[str, str]:
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
async def health_check(self) -> bool:
"""Check if OpenMemory server is healthy"""
try:
resp = await self.client.get("/health")
return resp.status_code == 200
except Exception as e:
print(f"Health check failed: {e}")
return False
async def add_memory(
self,
content: str,
user_id: str,
sector: str = "semantic",
tags: List[str] = None,
metadata: Dict[str, Any] = None
) -> str:
"""Add a new memory"""
payload = {
"content": content,
"user_id": user_id,
"sector": sector,
"tags": tags or [],
"metadata": metadata or {}
}
resp = await self.client.post("/memory/add", json=payload)
resp.raise_for_status()
data = resp.json()
return data.get("id")
async def query_memories(
self,
query: str,
user_id: str,
k: int = 5,
filters: Dict[str, Any] = None,
sectors: List[str] = None
) -> List[Dict[str, Any]]:
"""Query memories by semantic similarity"""
payload = {
"query": query,
"user_id": user_id,
"k": k,
"filters": filters or {},
}
if sectors:
payload["sectors"] = sectors
resp = await self.client.post("/memory/query", json=payload)
resp.raise_for_status()
data = resp.json()
return data.get("memories", [])
async def get_memory(self, memory_id: str, user_id: str) -> Optional[Dict]:
"""Get a specific memory"""
resp = await self.client.get(
f"/memory/{memory_id}",
params={"user_id": user_id}
)
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
async def reinforce_memory(
self,
memory_id: str,
user_id: str,
boost: float = 0.1
) -> bool:
"""Boost memory salience (strengthen memory)"""
payload = {
"user_id": user_id,
"boost": boost
}
resp = await self.client.post(f"/memory/{memory_id}/reinforce", json=payload)
resp.raise_for_status()
return True
async def delete_memory(self, memory_id: str, user_id: str) -> bool:
"""Delete a memory"""
resp = await self.client.delete(
f"/memory/{memory_id}",
params={"user_id": user_id}
)
resp.raise_for_status()
return True
async def list_memories(
self,
user_id: str,
limit: int = 20,
offset: int = 0,
sector: str = None
) -> List[Dict]:
"""List recent memories for user"""
params = {
"user_id": user_id,
"limit": limit,
"offset": offset
}
if sector:
params["sector"] = sector
resp = await self.client.get("/memory/list", params=params)
resp.raise_for_status()
data = resp.json()
return data.get("memories", [])
async def get_user_summary(self, user_id: str) -> Optional[Dict]:
"""Get user memory summary"""
resp = await self.client.get(f"/users/{user_id}/summary")
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
async def close(self):
"""Close HTTP client"""
await self.client.aclose()
2.3 ifriend_agent/memory/openmemory_service.py¶
import os
from typing import List, Dict, Any, Optional
from datetime import datetime
from google.adk.memory import BaseMemoryService
from google.genai import types
import uuid
from .client import OpenMemoryClient
class OpenMemoryService(BaseMemoryService):
"""Custom Memory Service integrating OpenMemory"""
def __init__(
self,
openmemory_url: str = None,
openmemory_api_key: str = None,
):
self.client = OpenMemoryClient(
base_url=openmemory_url,
api_key=openmemory_api_key
)
async def add_session_to_memory(self, session) -> None:
"""Save session interactions to OpenMemory as episodic memories"""
user_id = session.user_id
session_id = session.id
# Extract Q&A from session.events
for i, event in enumerate(session.events or []):
# Skip if not content
if not event.content or not hasattr(event.content, 'parts'):
continue
# Extract text from event
text_parts = [
part.text for part in event.content.parts
if hasattr(part, 'text') and part.text
]
text = " ".join(text_parts)
# Determine sector based on author
if event.author == "user":
sector = "episodic" # User question
role = "User"
else:
sector = "episodic" # Agent response
role = "Agent"
# Create memory content
memory_content = f"[{role}] {text[:500]}" # Truncate to 500 chars
# Store in OpenMemory
try:
await self.client.add_memory(
content=memory_content,
user_id=user_id,
sector=sector,
tags=["session", session_id],
metadata={
"session_id": session_id,
"author": event.author,
"timestamp": event.created_at.isoformat() if hasattr(event, 'created_at') else None
}
)
except Exception as e:
print(f"Error adding memory: {e}")
async def search_memory(
self,
query: str,
user_id: str,
k: int = 5,
filters: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""Search memories semantically"""
try:
results = await self.client.query_memories(
query=query,
user_id=user_id,
k=k,
filters=filters
)
return results
except Exception as e:
print(f"Error searching memory: {e}")
return []
async def get_summary(self, user_id: str) -> str:
"""Get user memory summary"""
try:
summary_data = await self.client.get_user_summary(user_id)
if summary_data:
return summary_data.get("summary", "")
return ""
except Exception as e:
print(f"Error getting summary: {e}")
return ""
async def reinforce_memory(
self,
memory_id: str,
user_id: str,
boost: float = 0.1
) -> None:
"""Strengthen a memory (increase salience)"""
try:
await self.client.reinforce_memory(memory_id, user_id, boost)
except Exception as e:
print(f"Error reinforcing memory: {e}")
async def delete_memory(self, memory_id: str, user_id: str) -> None:
"""Delete a memory"""
try:
await self.client.delete_memory(memory_id, user_id)
except Exception as e:
print(f"Error deleting memory: {e}")
async def health_check(self) -> bool:
"""Check if OpenMemory backend is healthy"""
return await self.client.health_check()
async def close(self):
"""Cleanup resources"""
await self.client.close()
Fase 3: Integração com ADK Agent (Semana 2)¶
3.1 Atualizar slack_bot.py¶
# Adicionar ao topo
from ifriend_agent.memory.openmemory_service import OpenMemoryService
# Substituir memory_service
memory_service = OpenMemoryService(
openmemory_url=os.getenv("OPENMEMORY_URL"),
openmemory_api_key=os.getenv("OPENMEMORY_API_KEY")
)
# Adicionar callback para salvar após conversa
async def auto_save_session_callback(callback_context):
"""Salvar sessão para OpenMemory após cada conversa"""
try:
session = callback_context._invocation_context.session
await memory_service.add_session_to_memory(session)
except Exception as e:
logger.error(f"Error saving to OpenMemory: {e}")
root_agent.after_agent_callback = auto_save_session_callback
3.2 Usar memória na context do Agent¶
async def _handle_message(...):
# ... existing code ...
# Carregar contexto de memória ANTES de rodar agent
memory_context = ""
if user_id:
try:
memories = await memory_service.search_memory(
query=user_content, # ou extrair texto do content
user_id=user_id,
k=3
)
if memories:
memory_context = "Contexto de memórias anteriores:\n"
for mem in memories:
memory_context += f"- {mem.get('content', '')}\n"
except Exception as e:
logger.warning(f"Failed to load memories: {e}")
# Adicionar memória ao user_content (system prompt update)
# Isso vai depender de como o Agent aceita contexto
# ... continue com runner.run_async() ...
Fase 4: Deploy e Monitoramento (Semana 2-3)¶
4.1 Docker Compose para Produção¶
version: '3.8'
services:
openmemory:
image: openmemory/openmemory:latest
container_name: openmemory-prod
environment:
# Production database (PostgreSQL recomendado)
OM_DATABASE_URL: postgresql://user:${DB_PASSWORD}@postgres:5432/openmemory
# Embeddings (Ollama in separate container or cloud API)
OM_EMBEDDINGS: ollama
OLLAMA_URL: http://ollama:11434
# Security
OM_API_KEY: ${OM_API_KEY}
OM_TELEMETRY: "false" # Disable in production
ports:
- "8080:8080"
volumes:
- openmemory_data:/data
depends_on:
- postgres
- ollama
restart: always
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
postgres:
image: postgres:15-alpine
container_name: openmemory-db
environment:
POSTGRES_USER: openmemory
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_DB: openmemory
volumes:
- postgres_data:/var/lib/postgresql/data
restart: always
ollama:
image: ollama/ollama:latest
container_name: openmemory-embeddings
volumes:
- ollama_data:/root/.ollama
environment:
- OLLAMA_HOST=0.0.0.0:11434
restart: always
volumes:
openmemory_data:
postgres_data:
ollama_data:
4.2 Kubernetes Deployment (Optional)¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: openmemory
namespace: ifriend
spec:
replicas: 1
selector:
matchLabels:
app: openmemory
template:
metadata:
labels:
app: openmemory
spec:
containers:
- name: openmemory
image: openmemory/openmemory:latest
ports:
- containerPort: 8080
env:
- name: OM_DATABASE_URL
valueFrom:
secretKeyRef:
name: openmemory-secrets
key: database-url
- name: OM_API_KEY
valueFrom:
secretKeyRef:
name: openmemory-secrets
key: api-key
- name: OLLAMA_URL
value: "http://ollama:11434"
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: openmemory
namespace: ifriend
spec:
selector:
app: openmemory
ports:
- protocol: TCP
port: 8080
targetPort: 8080
type: ClusterIP
Fase 5: Testes (Semana 2)¶
5.1 Tests Unitários¶
# tests/test_openmemory_service.py
import pytest
from ifriend_agent.memory.openmemory_service import OpenMemoryService
@pytest.mark.asyncio
async def test_health_check():
service = OpenMemoryService()
assert await service.health_check()
@pytest.mark.asyncio
async def test_add_memory():
service = OpenMemoryService()
user_id = "test_user_123"
memory_id = await service.client.add_memory(
content="Test memory content",
user_id=user_id,
sector="semantic"
)
assert memory_id is not None
@pytest.mark.asyncio
async def test_query_memory():
service = OpenMemoryService()
user_id = "test_user_123"
# Add first
await service.client.add_memory(
content="The user likes dark mode",
user_id=user_id
)
# Query
results = await service.search_memory(
query="user preferences",
user_id=user_id,
k=1
)
assert len(results) > 0
5.2 Integration Tests¶
# tests/test_integration.py
@pytest.mark.asyncio
async def test_session_to_memory_flow():
"""Test full flow: session → memory storage → query"""
service = OpenMemoryService()
# Simular session com eventos
session = MockSession(
user_id="user_123",
id="session_abc",
events=[
Event(author="user", content="Como integrar OpenMemory?"),
Event(author="agent", content="OpenMemory é um sistema de memória...")
]
)
# Salvar para memória
await service.add_session_to_memory(session)
# Verificar que foi salvo
memories = await service.search_memory(
query="OpenMemory integration",
user_id="user_123"
)
assert len(memories) > 0
Fase 6: Monitoring e Otimizações (Semana 3+)¶
6.1 Métricas¶
# ifriend_agent/memory/metrics.py
import time
from typing import Callable
import functools
class MemoryMetrics:
def __init__(self):
self.query_times = []
self.add_times = []
self.errors = 0
def record_query_time(self, duration_ms: float):
self.query_times.append(duration_ms)
def get_avg_query_time(self) -> float:
if not self.query_times:
return 0
return sum(self.query_times) / len(self.query_times)
def record_error(self):
self.errors += 1
def timed_operation(metrics: MemoryMetrics):
"""Decorator para medir performance"""
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
try:
result = await func(*args, **kwargs)
duration_ms = (time.time() - start) * 1000
if 'query' in func.__name__:
metrics.record_query_time(duration_ms)
return result
except Exception as e:
metrics.record_error()
raise
return wrapper
return decorator
6.2 Dashboard Monitoring¶
OpenMemory já inclui dashboard! Acessar em:
http://localhost:8080/dashboard
Features: - Visualizar all memories por sector - Ver decay over time - Monitor performance - Manage users and summaries
Fase 7: Otimizações Avançadas (Semana 4+)¶
7.1 Multi-Sector Memory¶
# Usar diferentes sectors para diferentes tipos
# Semântico: facts e knowledge
await service.client.add_memory(
content="OpenMemory supports Ollama for embeddings",
user_id="user_123",
sector="semantic"
)
# Episódico: events e conversas
await service.client.add_memory(
content="User asked about OpenMemory integration on 2025-11-23",
user_id="user_123",
sector="episodic"
)
# Procedural: como fazer coisas
await service.client.add_memory(
content="To integrate OpenMemory: 1. Setup backend 2. Create service 3. Add to agent",
user_id="user_123",
sector="procedural"
)
# Emotional: sentimentos e preferências
await service.client.add_memory(
content="User seems interested in vendor-independent solutions",
user_id="user_123",
sector="emotional"
)
7.2 Temporal Knowledge Graph¶
# Usar OpenMemory temporal features para fact evolution
POST /api/temporal/fact
{
"subject": "user_123",
"predicate": "prefers_embedding_model",
"object": "nomic-embed-text",
"valid_from": "2025-11-23",
"confidence": 0.95
}
# Query: o que o user preferia em uma data específica?
GET /api/temporal/fact?subject=user_123&predicate=prefers_embedding_model&at=2025-11-20
Timeline de Implementação¶
Semana 1:
├─ Dia 1-2: Setup pastas + estrutura
├─ Dia 3-4: Implementar client.py + schemas.py
├─ Dia 5-7: Implementar openmemory_service.py + testes básicos
Semana 2:
├─ Dia 1-2: Integrar com slack_bot.py
├─ Dia 3-4: Integration tests + debugging
├─ Dia 5-7: Deploy local + testes E2E
Semana 3:
├─ Dia 1-2: Setup produção (PostgreSQL + K8s)
├─ Dia 3-4: Monitoring + dashboards
├─ Dia 5-7: Performance optimization
Semana 4+:
├─ Multi-sector memory strategies
├─ Temporal knowledge graph
└─ Advanced features
Comparação: Antes vs Depois¶
Antes (Vertex AI Memory Bank)¶
Custo: $50-100/mês
Latência: 300-500ms
Vendor lock-in: FORTE
Data ownership: Google's
Setup: Complexo (ADK specifics)
Depois (OpenMemory + Ollama)¶
Custo: $8-12/mês self-hosted (+ $5-15 Ollama)
Latência: 115ms (2-3x faster)
Vendor lock-in: NENHUM
Data ownership: 100% seu
Setup: Simples (HTTP API)
Total Economia¶
- Custo: 80% redução
- Performance: 2-3x mais rápido
- Controle: Total sobre infraestrutura
Próximas Ações¶
- ✅ Ler este plano
- ⏭️ Criar pasta
ifriend_agent/memory/ - ⏭️ Implementar
client.py - ⏭️ Implementar
openmemory_service.py - ⏭️ Integrar com
slack_bot.py - ⏭️ Testar localmente
- ⏭️ Deploy em produção
Referências¶
- OpenMemory GitHub: https://github.com/CaviraOSS/OpenMemory
- OpenMemory Docs: https://openmemory.cavira.app
- OpenMemory Discord: https://discord.gg/P7HaRayqTh
Status: Ready to implement ✅