From 34129545ec5ec85d535b56d5e7d426de77b7d28b Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Tue, 16 Dec 2025 17:09:19 -0800 Subject: [PATCH 1/5] WIP on summary views --- agent_memory_server/api.py | 247 +++++++++++++++ agent_memory_server/docket_tasks.py | 6 + agent_memory_server/models.py | 223 ++++++++++++- agent_memory_server/summary_views.py | 450 +++++++++++++++++++++++++++ agent_memory_server/tasks.py | 90 ++++++ tests/test_summary_views.py | 106 +++++++ 6 files changed, 1120 insertions(+), 2 deletions(-) create mode 100644 agent_memory_server/summary_views.py create mode 100644 agent_memory_server/tasks.py create mode 100644 tests/test_summary_views.py diff --git a/agent_memory_server/api.py b/agent_memory_server/api.py index 7d5f8ab..0960b03 100644 --- a/agent_memory_server/api.py +++ b/agent_memory_server/api.py @@ -5,6 +5,7 @@ from fastapi import APIRouter, Depends, Header, HTTPException, Query from mcp.server.fastmcp.prompts import base from mcp.types import TextContent +from ulid import ULID from agent_memory_server import long_term_memory, working_memory from agent_memory_server.auth import UserInfo, get_current_user @@ -16,6 +17,7 @@ from agent_memory_server.models import ( AckResponse, CreateMemoryRecordRequest, + CreateSummaryViewRequest, EditMemoryRecordRequest, GetSessionsQuery, MemoryMessage, @@ -24,14 +26,30 @@ MemoryRecord, MemoryRecordResultsResponse, ModelNameLiteral, + RunSummaryViewPartitionRequest, + RunSummaryViewRequest, SearchRequest, SessionListResponse, + SummaryView, + SummaryViewPartitionResult, SystemMessage, + Task, + TaskStatusEnum, + TaskTypeEnum, UpdateWorkingMemory, WorkingMemory, WorkingMemoryResponse, ) from agent_memory_server.summarization import _incremental_summary +from agent_memory_server.summary_views import ( + get_summary_view as get_summary_view_config, + list_partition_results, + list_summary_views, + save_partition_result, + save_summary_view, + summarize_partition_for_view, +) +from agent_memory_server.tasks import create_task, get_task from agent_memory_server.utils.redis import get_redis_conn @@ -1030,3 +1048,232 @@ async def memory_prompt( ) return MemoryPromptResponse(messages=_messages) + + +def _validate_summary_view_keys(payload: CreateSummaryViewRequest) -> None: + """Validate group_by and filter keys for a SummaryView. + + For v1 we explicitly restrict these keys to a small, known set so we can + implement execution safely. We also currently only support long-term + memory as the source for SummaryViews. + """ + + if payload.source != "long_term": + raise HTTPException( + status_code=400, + detail=( + "SummaryView.source must be 'long_term' for now; " + "'working_memory' is not yet supported." + ), + ) + + allowed_group_by = {"user_id", "namespace", "session_id", "memory_type"} + allowed_filters = { + "user_id", + "namespace", + "session_id", + "memory_type", + } + + invalid_group = [k for k in payload.group_by if k not in allowed_group_by] + if invalid_group: + raise HTTPException( + status_code=400, + detail=("Unsupported group_by fields: " + ", ".join(sorted(invalid_group))), + ) + + invalid_filters = [k for k in payload.filters if k not in allowed_filters] + if invalid_filters: + raise HTTPException( + status_code=400, + detail=("Unsupported filter fields: " + ", ".join(sorted(invalid_filters))), + ) + + +@router.post("/v1/summary-views", response_model=SummaryView) +async def create_summary_view( + payload: CreateSummaryViewRequest, + current_user: UserInfo = Depends(get_current_user), +): + """Create a new SummaryView configuration. + + The server assigns an ID; the configuration can then be run on-demand or + by background workers. + """ + + _validate_summary_view_keys(payload) + + view = SummaryView( + id=str(ULID()), + name=payload.name, + source=payload.source, + group_by=payload.group_by, + filters=payload.filters, + time_window_days=payload.time_window_days, + continuous=payload.continuous, + prompt=payload.prompt, + model_name=payload.model_name, + ) + + await save_summary_view(view) + return view + + +@router.get("/v1/summary-views", response_model=list[SummaryView]) +async def list_summary_views_endpoint( + current_user: UserInfo = Depends(get_current_user), +): + """List all registered SummaryViews. + + Filtering by source/continuous can be added later if needed. + """ + + return await list_summary_views() + + +@router.get("/v1/summary-views/{view_id}", response_model=SummaryView) +async def get_summary_view( + view_id: str, + current_user: UserInfo = Depends(get_current_user), +): + """Get a SummaryView configuration by ID.""" + + view = await get_summary_view_config(view_id) + if view is None: + raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found") + return view + + +@router.delete("/v1/summary-views/{view_id}", response_model=AckResponse) +async def delete_summary_view_endpoint( + view_id: str, + current_user: UserInfo = Depends(get_current_user), +): + """Delete a SummaryView configuration. + + Stored partition summaries are left as-is for now. + """ + + from agent_memory_server.summary_views import delete_summary_view + + await delete_summary_view(view_id) + return AckResponse(status="ok") + + +@router.post( + "/v1/summary-views/{view_id}/partitions/run", + response_model=SummaryViewPartitionResult, +) +async def run_summary_view_partition( + view_id: str, + payload: RunSummaryViewPartitionRequest, + current_user: UserInfo = Depends(get_current_user), +): + """Synchronously compute a summary for a single partition of a view. + + For long-term memory views this will query the underlying memories + and run a real summarization. For other sources it currently returns + a placeholder summary. + """ + + view = await get_summary_view_config(view_id) + if view is None: + raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found") + + # Ensure the provided group keys match the view's group_by definition. + group_keys = set(payload.group.keys()) + expected_keys = set(view.group_by) + if group_keys != expected_keys: + raise HTTPException( + status_code=400, + detail=( + f"group keys {sorted(group_keys)} must exactly match " + f"view.group_by {sorted(expected_keys)}" + ), + ) + + result = await summarize_partition_for_view(view, payload.group) + # Persist the result so it appears in materialized listings. + await save_partition_result(result) + return result + + +@router.get( + "/v1/summary-views/{view_id}/partitions", + response_model=list[SummaryViewPartitionResult], +) +async def list_summary_view_partitions( + view_id: str, + user_id: str | None = None, + namespace: str | None = None, + session_id: str | None = None, + memory_type: str | None = None, + current_user: UserInfo = Depends(get_current_user), +): + """List materialized partition summaries for a SummaryView. + + This does not trigger recomputation; it simply reads stored + SummaryViewPartitionResult entries from Redis. Optional query + parameters filter by group fields when present. + """ + + view = await get_summary_view_config(view_id) + if view is None: + raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found") + + group_filter: dict[str, str] = {} + if user_id is not None: + group_filter["user_id"] = user_id + if namespace is not None: + group_filter["namespace"] = namespace + if session_id is not None: + group_filter["session_id"] = session_id + if memory_type is not None: + group_filter["memory_type"] = memory_type + + return await list_partition_results(view_id, group_filter or None) + + +@router.post("/v1/summary-views/{view_id}/run", response_model=Task) +async def run_summary_view_full( + view_id: str, + payload: RunSummaryViewRequest, + background_tasks: HybridBackgroundTasks, + current_user: UserInfo = Depends(get_current_user), +): + """Trigger an asynchronous full recompute of all partitions for a view. + + Returns a Task that can be polled for status. The actual work is + performed by a Docket worker running refresh_summary_view. + """ + + view = await get_summary_view_config(view_id) + if view is None: + raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found") + + task_id = payload.task_id or str(ULID()) + task = Task( + id=task_id, + type=TaskTypeEnum.SUMMARY_VIEW_FULL_RUN, + status=TaskStatusEnum.PENDING, + view_id=view_id, + ) + await create_task(task) + + from agent_memory_server.summary_views import refresh_summary_view + + background_tasks.add_task(refresh_summary_view, view_id=view_id, task_id=task_id) + return task + + +@router.get("/v1/tasks/{task_id}", response_model=Task) +async def get_task_status( + task_id: str, + current_user: UserInfo = Depends(get_current_user), +): + """Get the status of a background Task by ID.""" + + task = await get_task(task_id) + if task is None: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + return task diff --git a/agent_memory_server/docket_tasks.py b/agent_memory_server/docket_tasks.py index e3b8cb0..6a14531 100644 --- a/agent_memory_server/docket_tasks.py +++ b/agent_memory_server/docket_tasks.py @@ -21,6 +21,10 @@ update_last_accessed, ) from agent_memory_server.summarization import summarize_session +from agent_memory_server.summary_views import ( + periodic_refresh_summary_views, + refresh_summary_view, +) logger = logging.getLogger(__name__) @@ -38,6 +42,8 @@ forget_long_term_memories, periodic_forget_long_term_memories, update_last_accessed, + refresh_summary_view, + periodic_refresh_summary_views, ] diff --git a/agent_memory_server/models.py b/agent_memory_server/models.py index b2395fd..ddf29c3 100644 --- a/agent_memory_server/models.py +++ b/agent_memory_server/models.py @@ -6,7 +6,7 @@ from agent_memory_client.models import ClientMemoryRecord from mcp.server.fastmcp.prompts import base -from mcp.types import AudioContent, EmbeddedResource, ImageContent, TextContent +from mcp.types import TextContent from pydantic import BaseModel, Field from ulid import ULID @@ -676,7 +676,7 @@ class SystemMessage(BaseModel): """A system message""" role: Literal["system"] = "system" - content: str | TextContent | ImageContent | AudioContent | EmbeddedResource + content: str | TextContent class UserMessage(base.Message): @@ -733,3 +733,222 @@ class EditMemoryRecordRequest(BaseModel): event_date: datetime | None = Field( default=None, description="Updated event date for episodic memories" ) + + +class TaskStatusEnum(str, Enum): + """Status values for background tasks exposed to clients.""" + + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + + +class TaskTypeEnum(str, Enum): + """Type of background task. + + We start with summary view refreshes but keep this extensible. + """ + + SUMMARY_VIEW_FULL_RUN = "summary_view_full_run" + + +class Task(BaseModel): + """Client-visible background task tracked in Redis as JSON. + + These tasks represent long-running operations such as a full recompute + of all partitions for a SummaryView. + """ + + id: str = Field(description="Unique task identifier (client or server generated)") + type: TaskTypeEnum = Field( + description="Type of task, e.g. summary_view_full_run", + ) + status: TaskStatusEnum = Field( + default=TaskStatusEnum.PENDING, + description="Current task status", + ) + view_id: str | None = Field( + default=None, + description="Associated SummaryView ID, if applicable", + ) + created_at: datetime = Field( + default_factory=lambda: datetime.now(UTC), + description="When the task record was created", + ) + started_at: datetime | None = Field( + default=None, + description="When execution of the task actually started", + ) + completed_at: datetime | None = Field( + default=None, + description="When execution of the task finished (success or failure)", + ) + error_message: str | None = Field( + default=None, + description="Error message if the task failed", + ) + + +class SummaryView(BaseModel): + """Configuration for a summary view over memories. + + A SummaryView fully specifies what pool of memories to summarize and how + to partition and filter them, so it can be run on-demand or by a + background worker without additional runtime parameters. + """ + + id: str = Field(description="Unique identifier for the summary view") + name: str | None = Field( + default=None, + description="Optional human-readable name for the view", + ) + source: Literal["long_term", "working_memory"] = Field( + description=( + "Memory source to summarize. Currently only 'long_term' is " + "supported; 'working_memory' is reserved for future use." + ), + ) + group_by: list[str] = Field( + default_factory=list, + description=( + "Fields used to partition summaries (e.g. ['user_id'], " + "['user_id', 'namespace'])." + ), + ) + filters: dict[str, Any] = Field( + default_factory=dict, + description=( + "Static filters applied to every run (e.g. memory_type, namespace). " + "Only a small, known set of keys is supported in v1." + ), + ) + time_window_days: int | None = Field( + default=None, + ge=1, + description=( + "If set, each run uses now() - time_window_days as a cutoff " + "for eligible memories." + ), + ) + continuous: bool = Field( + default=False, + description=( + "If true, background workers periodically refresh all partitions " + "for this view." + ), + ) + prompt: str | None = Field( + default=None, + description=( + "Optional custom summarization instructions. If omitted, a " + "server-defined default prompt is used." + ), + ) + model_name: str | None = Field( + default=None, + description=( + "Optional model override for summarization. Defaults to a fast " + "model from settings when not provided." + ), + ) + + +class SummaryViewPartitionResult(BaseModel): + """Result of summarizing one partition of a SummaryView. + + A partition is defined by a concrete combination of the view's + group_by fields, e.g. {"user_id": "alice"} or + {"user_id": "alice", "namespace": "chat"}. + """ + + view_id: str = Field(description="ID of the SummaryView that produced this result") + group: dict[str, str] = Field( + description="Concrete values for the view's group_by fields", + ) + summary: str = Field(description="Summarized text for this partition") + memory_count: int = Field( + ge=0, + description="Number of memories that contributed to this summary", + ) + computed_at: datetime = Field( + default_factory=lambda: datetime.now(UTC), + description="When this summary was computed", + ) + + +class CreateSummaryViewRequest(BaseModel): + """Payload for creating a new SummaryView. + + Same fields as SummaryView except for the server-assigned id. + """ + + name: str | None = Field( + default=None, + description="Optional human-readable name for the view", + ) + source: Literal["long_term", "working_memory"] = Field( + description="Memory source to summarize: long-term or working memory", + ) + group_by: list[str] = Field( + default_factory=list, + description=( + "Fields used to partition summaries (e.g. ['user_id'], " + "['user_id', 'namespace'])." + ), + ) + filters: dict[str, Any] = Field( + default_factory=dict, + description=( + "Static filters applied to every run (e.g. memory_type, namespace). " + "Only a small, known set of keys is supported in v1." + ), + ) + time_window_days: int | None = Field( + default=None, + ge=1, + description=( + "If set, each run uses now() - time_window_days as a cutoff " + "for eligible memories." + ), + ) + continuous: bool = Field( + default=False, + description=( + "If true, background workers periodically refresh all partitions " + "for this view." + ), + ) + prompt: str | None = Field( + default=None, + description=( + "Optional custom summarization instructions. If omitted, a " + "server-defined default prompt is used." + ), + ) + model_name: str | None = Field( + default=None, + description=( + "Optional model override for summarization. Defaults to a fast " + "model from settings when not provided." + ), + ) + + +class RunSummaryViewPartitionRequest(BaseModel): + """Request body for running a single partition of a SummaryView.""" + + group: dict[str, str] = Field( + description="Concrete values for this view's group_by fields", + ) + + +class RunSummaryViewRequest(BaseModel): + """Request body for triggering a full SummaryView run as a Task.""" + + task_id: str | None = Field( + default=None, + description=( + "Optional client-provided task ID. If omitted, the server " "generates one." + ), + ) diff --git a/agent_memory_server/summary_views.py b/agent_memory_server/summary_views.py new file mode 100644 index 0000000..c77c03b --- /dev/null +++ b/agent_memory_server/summary_views.py @@ -0,0 +1,450 @@ +"""Helpers for SummaryView configs, stored results, and execution stubs. + +This module currently focuses on Redis JSON storage and key conventions. +Execution logic for summarizing memories will be expanded in follow-up +changes; for now, we provide minimal placeholder behavior so the API +surface is wired end-to-end. +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Iterable +from datetime import UTC, datetime, timedelta +from typing import Any + +from docket import Perpetual + +from agent_memory_server import long_term_memory +from agent_memory_server.config import settings +from agent_memory_server.filters import ( + CreatedAt, + MemoryType, + Namespace, + SessionId, + UserId, +) +from agent_memory_server.models import ( + MemoryRecord, + SummaryView, + SummaryViewPartitionResult, + TaskStatusEnum, +) +from agent_memory_server.tasks import update_task_status +from agent_memory_server.utils.redis import get_redis_conn + + +logger = logging.getLogger(__name__) + + +_SUMMARY_VIEW_INDEX_KEY = "summary_view:index" + + +def _config_key(view_id: str) -> str: + return f"summary_view:{view_id}:config" + + +def _summary_key(view_id: str, partition_key: str) -> str: + return f"summary_view:{view_id}:summary:{partition_key}" + + +def encode_partition_key(group: dict[str, str]) -> str: + """Create a stable key representation from group_by values. + + Keys are sorted alphabetically so the same group always produces the + same identifier. + """ + + parts: list[str] = [] + for key in sorted(group.keys()): + value = group[key] + parts.append(f"{key}={value}") + return "|".join(parts) + + +def _matches_group_filter(group: dict[str, str], group_filter: dict[str, str]) -> bool: + return all(group.get(key) == value for key, value in group_filter.items()) + + +async def save_summary_view(view: SummaryView) -> None: + """Persist a SummaryView definition in Redis as JSON and index it.""" + + redis = await get_redis_conn() + await redis.set(_config_key(view.id), view.model_dump_json()) + await redis.sadd(_SUMMARY_VIEW_INDEX_KEY, view.id) + + +async def get_summary_view(view_id: str) -> SummaryView | None: + """Load a SummaryView by ID from Redis JSON storage.""" + + redis = await get_redis_conn() + raw = await redis.get(_config_key(view_id)) + if raw is None: + return None + + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + + try: + return SummaryView.model_validate_json(raw) + except Exception: + logger.exception("Failed to decode SummaryView JSON for %s", view_id) + return None + + +async def list_summary_views() -> list[SummaryView]: + """Return all SummaryViews registered in the index. + + This performs one GET per view ID; acceptable for the current scale. + """ + + redis = await get_redis_conn() + ids: Iterable[bytes] = await redis.smembers(_SUMMARY_VIEW_INDEX_KEY) + views: list[SummaryView] = [] + + for raw_id in ids: + view_id = raw_id.decode("utf-8") if isinstance(raw_id, bytes) else str(raw_id) + view = await get_summary_view(view_id) + if view is not None: + views.append(view) + + return views + + +async def delete_summary_view(view_id: str) -> None: + """Delete a SummaryView config and remove it from the index. + + Stored partition summaries are left as-is for now; they can be cleaned + up in a later pass if needed. + """ + + redis = await get_redis_conn() + await redis.delete(_config_key(view_id)) + await redis.srem(_SUMMARY_VIEW_INDEX_KEY, view_id) + + +async def save_partition_result(result: SummaryViewPartitionResult) -> None: + """Persist a single partition result for a SummaryView.""" + + redis = await get_redis_conn() + partition_key = encode_partition_key(result.group) + await redis.set( + _summary_key(result.view_id, partition_key), result.model_dump_json() + ) + + +async def list_partition_results( + view_id: str, group_filter: dict[str, str] | None = None +) -> list[SummaryViewPartitionResult]: + """List stored partition results for a view, optionally filtered by group. + + This reads whatever has been materialized so far; it does not trigger + recomputation. + """ + + redis = await get_redis_conn() + pattern = _summary_key(view_id, "*") + results: list[SummaryViewPartitionResult] = [] + + async for key in redis.scan_iter(match=pattern): + raw = await redis.get(key) + if raw is None: + continue + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + try: + result = SummaryViewPartitionResult.model_validate_json(raw) + except Exception: + logger.exception( + "Failed to decode SummaryViewPartitionResult for key %s", key + ) + continue + + if group_filter and not _matches_group_filter(result.group, group_filter): + continue + + results.append(result) + + return results + + +def _build_long_term_filters_for_view( + view: SummaryView, + extra_group: dict[str, str] | None = None, +) -> dict[str, Any]: + """Build keyword arguments for search_long_term_memories. + + Maps SummaryView.filters and optionally a concrete group dict into + typed filter objects used by long_term_memory.search_long_term_memories. + """ + + filters: dict[str, Any] = {} + + # Static filters from the view config + for key, value in view.filters.items(): + if key == "user_id": + filters["user_id"] = UserId(eq=str(value)) + elif key == "namespace": + filters["namespace"] = Namespace(eq=str(value)) + elif key == "session_id": + filters["session_id"] = SessionId(eq=str(value)) + elif key == "memory_type": + filters["memory_type"] = MemoryType(eq=str(value)) + + # Group-specific filters + if extra_group: + for key, value in extra_group.items(): + if key == "user_id": + filters["user_id"] = UserId(eq=value) + elif key == "namespace": + filters["namespace"] = Namespace(eq=value) + elif key == "session_id": + filters["session_id"] = SessionId(eq=value) + elif key == "memory_type": + filters["memory_type"] = MemoryType(eq=value) + + # Time window: apply to created_at for now + if view.time_window_days is not None and view.time_window_days > 0: + cutoff = datetime.now(UTC) - timedelta(days=view.time_window_days) + filters["created_at"] = CreatedAt(gte=cutoff) + + return filters + + +async def _fetch_long_term_memories_for_view( + view: SummaryView, + extra_group: dict[str, str] | None = None, + limit: int = 1000, +) -> list[MemoryRecord]: + """Fetch long-term memories matching a SummaryView and optional group. + + Uses the filter-only listing path of search_long_term_memories by + providing an empty text query. + """ + + filters = _build_long_term_filters_for_view(view, extra_group) + + results = await long_term_memory.search_long_term_memories( + text="", + limit=limit, + offset=0, + **filters, + ) + return list(results.memories) + + +def _partition_memories_by_group( + view: SummaryView, memories: list[MemoryRecord] +) -> dict[tuple[tuple[str, str], ...], list[MemoryRecord]]: + """Group memories into partitions based on view.group_by fields. + + Returns a mapping from a stable tuple key to a list of MemoryRecord. + The key is a sorted tuple of (field, value) pairs. + """ + + partitions: dict[tuple[tuple[str, str], ...], list[MemoryRecord]] = {} + for mem in memories: + group_dict: dict[str, str] = {} + for field in view.group_by: + value = getattr(mem, field, None) + if value is None: + # If a grouping field is missing for this memory, skip it + break + group_dict[field] = str(value) + else: + # Only executed if the inner loop did not break + key = tuple(sorted(group_dict.items())) + partitions.setdefault(key, []).append(mem) + + return partitions + + +async def summarize_partition_long_term( + view: SummaryView, + group: dict[str, str], + memories: list[MemoryRecord], +) -> SummaryViewPartitionResult: + """Summarize a partition of long-term memories. + + For now we keep the prompt simple and use a single chat completion + call with a textual join of memory texts. + """ + + if not memories: + summary_text = f"No memories found for group {group!r}." + return SummaryViewPartitionResult( + view_id=view.id, + group=group, + summary=summary_text, + memory_count=0, + computed_at=datetime.now(UTC), + ) + + # If no LLM credentials are configured, fall back to a simple + # deterministic summary that just concatenates memory texts. + if not ( + settings.openai_api_key + or settings.anthropic_api_key + or settings.aws_access_key_id + ): + joined = "\n".join(f"- {m.text}" for m in memories[:50]) + summary_text = ( + "LLM summarization disabled (no API keys configured). " + "Concatenated up to 50 memories:\n" + joined + ) + else: + from agent_memory_server.llms import get_model_client + + model_name = view.model_name or settings.fast_model + client = await get_model_client(model_name) + + # Build a simple prompt using either the view's prompt or a default. + default_instructions = ( + "You are a summarization assistant. Given a set of long-term " + "memories, produce a concise summary that highlights key facts, " + "stable preferences, and important events relevant to the group." + ) + instructions = view.prompt or default_instructions + + memories_text = "\n".join(f"- {m.text}" for m in memories) + prompt = ( + f"{instructions}\n\n" + f"GROUP: {json.dumps(group, sort_keys=True)}\n\n" + f"MEMORIES:\n{memories_text}\n\nSUMMARY:" + ) + + # We use the same interface pattern as other summarization helpers. + response = await client.create_chat_completion(model_name, prompt) + summary_text = response.choices[0].message.content + + return SummaryViewPartitionResult( + view_id=view.id, + group=group, + summary=summary_text, + memory_count=len(memories), + computed_at=datetime.now(UTC), + ) + + +async def summarize_partition_placeholder( + view: SummaryView, group: dict[str, str] +) -> SummaryViewPartitionResult: + """Fallback placeholder summary for unsupported sources. + + Used currently for SummaryViews whose source is not yet implemented. + """ + + summary_text = f"Placeholder summary for view {view.id} with group {group!r}." + return SummaryViewPartitionResult( + view_id=view.id, + group=group, + summary=summary_text, + memory_count=0, + computed_at=datetime.now(UTC), + ) + + +async def summarize_partition_for_view( + view: SummaryView, group: dict[str, str] +) -> SummaryViewPartitionResult: + """High-level entry point to summarize a single partition for a view. + + Dispatches to the appropriate implementation based on view.source. + """ + + if view.source == "long_term": + memories = await _fetch_long_term_memories_for_view(view, extra_group=group) + return await summarize_partition_long_term(view, group, memories) + + # Fallback for sources we haven't implemented yet. + return await summarize_partition_placeholder(view, group) + + +async def refresh_summary_view(view_id: str, task_id: str | None = None) -> None: + """Docket task to recompute all partitions for a SummaryView. + + For long-term memory sources, this will fetch memories matching the + view filters, partition them by group_by, summarize each partition, + and store SummaryViewPartitionResult entries. + """ + + view = await get_summary_view(view_id) + now = datetime.now(UTC) + + if task_id is not None: + if view is None: + await update_task_status( + task_id, + status=TaskStatusEnum.FAILED, + completed_at=now, + error_message=f"SummaryView {view_id} not found", + ) + return + + await update_task_status( + task_id, + status=TaskStatusEnum.RUNNING, + started_at=now, + ) + + if view is None: + # Nothing to do; already handled task status above if needed. + return + + try: + if view.source == "long_term": + # Fetch all relevant memories and partition them. + memories = await _fetch_long_term_memories_for_view(view) + partitions = _partition_memories_by_group(view, memories) + + for key, mems in partitions.items(): + group = dict(key) + result = await summarize_partition_long_term(view, group, mems) + await save_partition_result(result) + else: + # For unsupported sources, we currently do nothing. + logger.info( + "refresh_summary_view: source %s not yet implemented for view %s", + view.source, + view.id, + ) + + if task_id is not None: + await update_task_status( + task_id, + status=TaskStatusEnum.SUCCESS, + completed_at=datetime.now(UTC), + ) + except Exception as exc: # noqa: BLE001 + logger.exception("Error refreshing SummaryView %s", view_id) + if task_id is not None: + await update_task_status( + task_id, + status=TaskStatusEnum.FAILED, + completed_at=datetime.now(UTC), + error_message=str(exc), + ) + + +async def periodic_refresh_summary_views( + perpetual: Perpetual = Perpetual( + every=timedelta(minutes=60), + automatic=True, + ), +) -> None: + """Periodic Docket task to refresh all continuous SummaryViews. + + Uses the same refresh_summary_view helper but without task tracking. + """ + + if not settings.long_term_memory: + # If long-term memory is entirely disabled, there may still be + # working-memory backed views later, but for now we bail out. + return + + views = await list_summary_views() + for view in views: + if not view.continuous: + continue + await refresh_summary_view(view.id, task_id=None) diff --git a/agent_memory_server/tasks.py b/agent_memory_server/tasks.py new file mode 100644 index 0000000..bb9bd21 --- /dev/null +++ b/agent_memory_server/tasks.py @@ -0,0 +1,90 @@ +import logging +from datetime import UTC, datetime + +from agent_memory_server.models import Task, TaskStatusEnum +from agent_memory_server.utils.redis import get_redis_conn + + +logger = logging.getLogger(__name__) + + +def _task_key(task_id: str) -> str: + """Return the Redis key for a task JSON payload.""" + + return f"task:{task_id}" + + +async def create_task(task: Task) -> None: + """Persist a new Task as JSON in Redis. + + This overwrites any existing task with the same ID. + """ + + redis = await get_redis_conn() + await redis.set(_task_key(task.id), task.model_dump_json()) + + +async def get_task(task_id: str) -> Task | None: + """Load a Task from Redis JSON storage. + + Returns None if the task does not exist. + """ + + redis = await get_redis_conn() + raw = await redis.get(_task_key(task_id)) + if raw is None: + return None + + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + + try: + return Task.model_validate_json(raw) + except Exception: + logger.exception("Failed to decode task JSON for %s", task_id) + return None + + +async def update_task_status( + task_id: str, + *, + status: TaskStatusEnum | None = None, + started_at: datetime | None = None, + completed_at: datetime | None = None, + error_message: str | None = None, +) -> None: + """Update status and timestamps for an existing Task. + + If the task does not exist, this is a no-op. + """ + + redis = await get_redis_conn() + key = _task_key(task_id) + raw = await redis.get(key) + if raw is None: + logger.warning("Attempted to update missing task %s", task_id) + return + + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + + try: + task = Task.model_validate_json(raw) + except Exception: + logger.exception("Failed to decode task JSON for %s during update", task_id) + return + + if status is not None: + task.status = status + if started_at is not None: + task.started_at = started_at + if completed_at is not None: + task.completed_at = completed_at + if error_message is not None: + task.error_message = error_message + + # Ensure created_at is always set + if task.created_at is None: + task.created_at = datetime.now(UTC) + + await redis.set(key, task.model_dump_json()) diff --git a/tests/test_summary_views.py b/tests/test_summary_views.py new file mode 100644 index 0000000..3e1d8ff --- /dev/null +++ b/tests/test_summary_views.py @@ -0,0 +1,106 @@ +import pytest + +from agent_memory_server.models import TaskStatusEnum +from agent_memory_server.utils.redis import get_redis_conn + + +@pytest.mark.asyncio +async def test_create_and_get_summary_view(client): + # Create a summary view + payload = { + "name": "ltm_by_user_30d", + "source": "long_term", + "group_by": ["user_id"], + "filters": {"memory_type": "semantic"}, + "time_window_days": 30, + "continuous": False, + "prompt": None, + "model_name": None, + } + resp = await client.post("/v1/summary-views", json=payload) + assert resp.status_code == 200, resp.text + view = resp.json() + view_id = view["id"] + + # Fetch it back + resp_get = await client.get(f"/v1/summary-views/{view_id}") + assert resp_get.status_code == 200 + fetched = resp_get.json() + assert fetched["id"] == view_id + assert fetched["group_by"] == ["user_id"] + + +@pytest.mark.asyncio +async def test_run_single_partition_and_list_partitions(client): + # Create a simple view grouped by user_id + payload = { + "name": "ltm_by_user", + "source": "long_term", + "group_by": ["user_id"], + "filters": {}, + "time_window_days": None, + "continuous": False, + "prompt": None, + "model_name": None, + } + resp = await client.post("/v1/summary-views", json=payload) + assert resp.status_code == 200, resp.text + view_id = resp.json()["id"] + + # Run a single partition synchronously + run_payload = {"group": {"user_id": "alice"}} + resp_run = await client.post( + f"/v1/summary-views/{view_id}/partitions/run", json=run_payload + ) + assert resp_run.status_code == 200, resp_run.text + result = resp_run.json() + assert result["group"] == {"user_id": "alice"} + assert "summary" in result + + # List materialized partitions + resp_list = await client.get( + f"/v1/summary-views/{view_id}/partitions", params={"user_id": "alice"} + ) + assert resp_list.status_code == 200 + partitions = resp_list.json() + assert len(partitions) == 1 + assert partitions[0]["group"]["user_id"] == "alice" + + +@pytest.mark.asyncio +async def test_run_full_view_creates_task_and_updates_status(client): + # Create a summary view + payload = { + "name": "ltm_full_run", + "source": "long_term", + "group_by": ["user_id"], + "filters": {}, + "time_window_days": None, + "continuous": False, + "prompt": None, + "model_name": None, + } + resp = await client.post("/v1/summary-views", json=payload) + assert resp.status_code == 200, resp.text + view_id = resp.json()["id"] + + # Trigger a full run + resp_run = await client.post(f"/v1/summary-views/{view_id}/run", json={}) + assert resp_run.status_code == 200, resp_run.text + task = resp_run.json() + task_id = task["id"] + + # Task should exist in Redis and eventually transition to success + redis = await get_redis_conn() + raw = await redis.get(f"task:{task_id}") + assert raw is not None + + # Poll the task status via the API + resp_task = await client.get(f"/v1/tasks/{task_id}") + assert resp_task.status_code == 200 + polled = resp_task.json() + assert polled["status"] in { + TaskStatusEnum.PENDING, + TaskStatusEnum.RUNNING, + TaskStatusEnum.SUCCESS, + } From 3b250f14a83f17cc698159fe704e8eeb9a388c35 Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Wed, 17 Dec 2025 09:04:59 -0800 Subject: [PATCH 2/5] fix redis in tests --- tests/conftest.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index b4cf706..b97cde0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -332,13 +332,19 @@ def patched_docket_init(self, name, url=None, *args, **kwargs): import agent_memory_server.vectorstore_factory with ( + # Core Redis helper patch("agent_memory_server.utils.redis.get_redis_conn", mock_get_redis_conn), - patch("docket.docket.Docket.__init__", patched_docket_init), - patch("agent_memory_server.working_memory.get_redis_conn", mock_get_redis_conn), + # Modules that imported get_redis_conn directly must also be patched patch("agent_memory_server.api.get_redis_conn", mock_get_redis_conn), + patch("agent_memory_server.working_memory.get_redis_conn", mock_get_redis_conn), patch( "agent_memory_server.long_term_memory.get_redis_conn", mock_get_redis_conn ), + patch("agent_memory_server.summary_views.get_redis_conn", mock_get_redis_conn), + patch("agent_memory_server.tasks.get_redis_conn", mock_get_redis_conn), + # Ensure Docket uses the test Redis URL + patch("docket.docket.Docket.__init__", patched_docket_init), + # Point settings.redis_url at the testcontainer Redis patch.object(settings, "redis_url", redis_url), ): # Reset global state to force recreation with test Redis From ec0edc123f7c1acbc74a08df531fb6a847686b39 Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Fri, 19 Dec 2025 11:58:46 -0800 Subject: [PATCH 3/5] Add GPT-5 model support, improve summary views, and update docs --- .../agent_memory_client/models.py | 4 + agent_memory_server/config.py | 77 +++++++- agent_memory_server/models.py | 16 ++ agent_memory_server/summary_views.py | 164 ++++++++++++++---- agent_memory_server/tasks.py | 13 +- docs/configuration.md | 8 +- docs/getting-started.md | 2 +- docs/query-optimization.md | 9 +- tests/test_summary_views.py | 67 ++++++- 9 files changed, 316 insertions(+), 44 deletions(-) diff --git a/agent-memory-client/agent_memory_client/models.py b/agent-memory-client/agent_memory_client/models.py index 4575ba3..9710e25 100644 --- a/agent-memory-client/agent_memory_client/models.py +++ b/agent-memory-client/agent_memory_client/models.py @@ -27,6 +27,10 @@ "o1", "o1-mini", "o3-mini", + "gpt-5-mini", + "gpt-5-nano", + "gpt-5.1-chat-latest", + "gpt-5.2-chat-latest", "text-embedding-ada-002", "text-embedding-3-small", "text-embedding-3-large", diff --git a/agent_memory_server/config.py b/agent_memory_server/config.py index 83ffcba..933ce4f 100644 --- a/agent_memory_server/config.py +++ b/agent_memory_server/config.py @@ -35,6 +35,18 @@ class ModelConfig(BaseModel): # Model configuration mapping MODEL_CONFIGS = { # OpenAI Models + "gpt-4.1": ModelConfig( + provider=ModelProvider.OPENAI, + name="gpt-4.1", + max_tokens=128000, + embedding_dimensions=1536, + ), + "gpt-4.1-mini": ModelConfig( + provider=ModelProvider.OPENAI, + name="gpt-4.1-mini", + max_tokens=128000, + embedding_dimensions=1536, + ), "gpt-3.5-turbo": ModelConfig( provider=ModelProvider.OPENAI, name="gpt-3.5-turbo", @@ -90,6 +102,31 @@ class ModelConfig(BaseModel): max_tokens=200000, embedding_dimensions=1536, ), + # GPT-5 family + "gpt-5-mini": ModelConfig( + provider=ModelProvider.OPENAI, + name="gpt-5-mini", + max_tokens=400000, + embedding_dimensions=1536, + ), + "gpt-5-nano": ModelConfig( + provider=ModelProvider.OPENAI, + name="gpt-5-nano", + max_tokens=400000, + embedding_dimensions=1536, + ), + "gpt-5.1-chat-latest": ModelConfig( + provider=ModelProvider.OPENAI, + name="gpt-5.1-chat-latest", + max_tokens=128000, + embedding_dimensions=1536, + ), + "gpt-5.2-chat-latest": ModelConfig( + provider=ModelProvider.OPENAI, + name="gpt-5.2-chat-latest", + max_tokens=128000, + embedding_dimensions=1536, + ), # Embedding models "text-embedding-ada-002": ModelConfig( provider=ModelProvider.OPENAI, @@ -134,7 +171,26 @@ class ModelConfig(BaseModel): max_tokens=200000, embedding_dimensions=1536, ), - # Latest Anthropic Models + # Claude 4.5 family (direct Anthropic API IDs) + "claude-sonnet-4-5-20250929": ModelConfig( + provider=ModelProvider.ANTHROPIC, + name="claude-sonnet-4-5-20250929", + max_tokens=200000, + embedding_dimensions=1536, + ), + "claude-haiku-4-5-20251001": ModelConfig( + provider=ModelProvider.ANTHROPIC, + name="claude-haiku-4-5-20251001", + max_tokens=200000, + embedding_dimensions=1536, + ), + "claude-opus-4-5-20251101": ModelConfig( + provider=ModelProvider.ANTHROPIC, + name="claude-opus-4-5-20251101", + max_tokens=200000, + embedding_dimensions=1536, + ), + # Latest Anthropic Models (Claude 3.x family) "claude-3-7-sonnet-20250219": ModelConfig( provider=ModelProvider.ANTHROPIC, name="claude-3-7-sonnet-20250219", @@ -178,6 +234,25 @@ class ModelConfig(BaseModel): max_tokens=200000, embedding_dimensions=1536, ), + # Aliases for Claude 4.5 family + "claude-sonnet-4-5": ModelConfig( + provider=ModelProvider.ANTHROPIC, + name="claude-sonnet-4-5-20250929", + max_tokens=200000, + embedding_dimensions=1536, + ), + "claude-haiku-4-5": ModelConfig( + provider=ModelProvider.ANTHROPIC, + name="claude-haiku-4-5-20251001", + max_tokens=200000, + embedding_dimensions=1536, + ), + "claude-opus-4-5": ModelConfig( + provider=ModelProvider.ANTHROPIC, + name="claude-opus-4-5-20251101", + max_tokens=200000, + embedding_dimensions=1536, + ), # AWS Bedrock Embedding Models "amazon.titan-embed-text-v2:0": ModelConfig( provider=ModelProvider.AWS_BEDROCK, diff --git a/agent_memory_server/models.py b/agent_memory_server/models.py index 7beb963..f36d06e 100644 --- a/agent_memory_server/models.py +++ b/agent_memory_server/models.py @@ -40,6 +40,9 @@ class MemoryTypeEnum(str, Enum): # These should match the keys in MODEL_CONFIGS ModelNameLiteral = Literal[ + # OpenAI chat and reasoning models + "gpt-4.1", + "gpt-4.1-mini", "gpt-3.5-turbo", "gpt-3.5-turbo-16k", "gpt-4", @@ -49,9 +52,15 @@ class MemoryTypeEnum(str, Enum): "o1", "o1-mini", "o3-mini", + "gpt-5-mini", + "gpt-5-nano", + "gpt-5.1-chat-latest", + "gpt-5.2-chat-latest", + # OpenAI embedding models "text-embedding-ada-002", "text-embedding-3-small", "text-embedding-3-large", + # Anthropic Claude 3.x family "claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307", @@ -63,6 +72,13 @@ class MemoryTypeEnum(str, Enum): "claude-3-5-sonnet-latest", "claude-3-5-haiku-latest", "claude-3-opus-latest", + # Anthropic Claude 4.5 family (direct API IDs and aliases) + "claude-sonnet-4-5-20250929", + "claude-haiku-4-5-20251001", + "claude-opus-4-5-20251101", + "claude-sonnet-4-5", + "claude-haiku-4-5", + "claude-opus-4-5", ] diff --git a/agent_memory_server/summary_views.py b/agent_memory_server/summary_views.py index 4a8ac2d..0fac2c7 100644 --- a/agent_memory_server/summary_views.py +++ b/agent_memory_server/summary_views.py @@ -1,9 +1,8 @@ -"""Helpers for SummaryView configs, stored results, and execution stubs. +"""Helpers for SummaryView configs, stored results, and summarization logic. -This module currently focuses on Redis JSON storage and key conventions. -Execution logic for summarizing memories will be expanded in follow-up -changes; for now, we provide minimal placeholder behavior so the API -surface is wired end-to-end. +This module implements the execution logic for summarizing long-term memory +sources using LLMs, including Redis JSON storage, key conventions, and +partitioned summary management so the API surface is wired end-to-end. """ from __future__ import annotations @@ -14,6 +13,7 @@ from datetime import UTC, datetime, timedelta from typing import Any +import tiktoken from docket import Perpetual from agent_memory_server import long_term_memory @@ -57,7 +57,11 @@ def encode_partition_key(group: dict[str, str]) -> str: """Create a stable key representation from group_by values. Keys are sorted alphabetically so the same group always produces the - same identifier. + same identifier. The resulting string is treated as an opaque key – + we never parse it back into a dict. Because the allowed group_by + fields are a small fixed set without the '|' or '=' characters, + this encoding is stable and effectively collision-free for our use + case even if values contain those delimiters. """ parts: list[str] = [] @@ -185,8 +189,13 @@ def _build_long_term_filters_for_view( filters: dict[str, Any] = {} - # Static filters from the view config - for key, value in view.filters.items(): + def _apply_filter(key: str, value: str | Any) -> None: + """Apply a single filter mapping from a raw key/value pair. + + Both static view.filters and extra_group values are coerced to str + for consistency. + """ + if key == "user_id": filters["user_id"] = UserId(eq=str(value)) elif key == "namespace": @@ -196,17 +205,14 @@ def _build_long_term_filters_for_view( elif key == "memory_type": filters["memory_type"] = MemoryType(eq=str(value)) + # Static filters from the view config + for key, value in view.filters.items(): + _apply_filter(key, value) + # Group-specific filters if extra_group: for key, value in extra_group.items(): - if key == "user_id": - filters["user_id"] = UserId(eq=value) - elif key == "namespace": - filters["namespace"] = Namespace(eq=value) - elif key == "session_id": - filters["session_id"] = SessionId(eq=value) - elif key == "memory_type": - filters["memory_type"] = MemoryType(eq=value) + _apply_filter(key, value) # Time window: apply to created_at for now if view.time_window_days is not None and view.time_window_days > 0: @@ -235,7 +241,21 @@ async def _fetch_long_term_memories_for_view( offset=0, **filters, ) - return list(results.memories) + memories = list(results.memories) + + # If we hit the page limit, log a warning so potential truncation does + # not go unnoticed. We keep the single-page behavior for now to avoid + # unbounded scans but make it observable. + if len(memories) >= limit: + logger.warning( + "_fetch_long_term_memories_for_view fetched %d memories for view %s; " + "results may be truncated due to the current limit=%d.", + len(memories), + view.id, + limit, + ) + + return memories def _partition_memories_by_group( @@ -264,6 +284,88 @@ def _partition_memories_by_group( return partitions +def _build_long_term_summary_prompt( + view: SummaryView, + group: dict[str, str], + memories: list[MemoryRecord], + model_name: str, + instructions: str, +) -> str: + """Build a token-aware prompt for long-term memory summarization. + + Uses tiktoken and the model's configured context window to truncate the + inlined memories so the prompt stays within a safe fraction of the + context limit while still leaving room for the model's response. + """ + + # Import here to avoid circular imports at module load time. + from agent_memory_server.llms import get_model_config + + encoding = tiktoken.get_encoding("cl100k_base") + + model_config = get_model_config(model_name) + full_context_tokens = max(model_config.max_tokens, 1) + + # Use the same summarization_threshold knob as working-memory + # summarization to control how much of the context window we devote + # to the prompt itself. + prompt_budget = int(full_context_tokens * settings.summarization_threshold) + + # Reserve some space for the model's response and any overhead. + reserved_completion_tokens = min(4096, full_context_tokens // 10) + max_prompt_tokens = max(prompt_budget - reserved_completion_tokens, 1024) + + base_prefix = ( + f"{instructions}\n\n" + f"GROUP: {json.dumps(group, sort_keys=True)}\n\n" + "MEMORIES:\n" + ) + base_tokens = len(encoding.encode(base_prefix)) + + remaining_tokens = max_prompt_tokens - base_tokens + if remaining_tokens <= 0: + return ( + base_prefix + + "[Memories omitted due to token budget constraints.]\n\nSUMMARY:" + ) + + # Cap the size of each individual memory text we inline so that a + # single extremely long memory cannot dominate the prompt. + max_bullet_tokens = min(1024, full_context_tokens // 20) + + bullet_lines: list[str] = [] + for mem in memories[:_MAX_MEMORIES_FOR_LLM_PROMPT]: + text = mem.text or "" + bullet = f"- {text}" + bullet_tokens = len(encoding.encode(bullet)) + + if bullet_tokens > max_bullet_tokens: + # Roughly truncate very long memories by characters, then + # recompute tokens. This mirrors the approach used in + # agent_memory_server.summarization. + approx_chars = max_bullet_tokens * 4 + text = text[:approx_chars] + bullet = f"- {text}" + bullet_tokens = len(encoding.encode(bullet)) + + if bullet_tokens > remaining_tokens: + break + + bullet_lines.append(bullet) + remaining_tokens -= bullet_tokens + + memories_text = "\n".join(bullet_lines) + total_memories = len(memories) + used_memories = len(bullet_lines) + if total_memories > used_memories: + memories_text += ( + f"\n\n[Memories truncated to fit token budget: used {used_memories} " + f"of {total_memories} entries]" + ) + + return f"{base_prefix}{memories_text}\n\nSUMMARY:" + + async def summarize_partition_long_term( view: SummaryView, group: dict[str, str], @@ -303,7 +405,9 @@ async def summarize_partition_long_term( model_name = view.model_name or settings.fast_model client = await get_model_client(model_name) - # Build a simple prompt using either the view's prompt or a default. + # Build a prompt using either the view's prompt or a default, then + # construct a token-aware memories section based on the model's + # configured context window. default_instructions = ( "You are a summarization assistant. Given a set of long-term " "memories, produce a concise summary that highlights key facts, " @@ -311,21 +415,12 @@ async def summarize_partition_long_term( ) instructions = view.prompt or default_instructions - # Avoid constructing an excessively large prompt when many memories - # are present in a partition. We cap the memories used for the prompt - # but still report the full memory_count below. - memories_for_prompt = memories[:_MAX_MEMORIES_FOR_LLM_PROMPT] - memories_text = "\n".join(f"- {m.text}" for m in memories_for_prompt) - if len(memories) > _MAX_MEMORIES_FOR_LLM_PROMPT: - memories_text += ( - f"\n\n[Truncated to first {_MAX_MEMORIES_FOR_LLM_PROMPT} " - f"memories out of {len(memories)}]" - ) - - prompt = ( - f"{instructions}\n\n" - f"GROUP: {json.dumps(group, sort_keys=True)}\n\n" - f"MEMORIES:\n{memories_text}\n\nSUMMARY:" + prompt = _build_long_term_summary_prompt( + view=view, + group=group, + memories=memories, + model_name=model_name, + instructions=instructions, ) # We use the same interface pattern as other summarization helpers, @@ -457,6 +552,9 @@ async def refresh_summary_view(view_id: str, task_id: str | None = None) -> None completed_at=datetime.now(UTC), ) except Exception as exc: # noqa: BLE001 + # We deliberately catch all exceptions here so that background workers + # never crash silently and any failure is reflected in the Task record + # as FAILED. The original error is logged with traceback above. logger.exception("Error refreshing SummaryView %s", view_id) if task_id is not None: await update_task_status( diff --git a/agent_memory_server/tasks.py b/agent_memory_server/tasks.py index bb9bd21..eb5b29d 100644 --- a/agent_memory_server/tasks.py +++ b/agent_memory_server/tasks.py @@ -8,6 +8,11 @@ logger = logging.getLogger(__name__) +# Tasks are operational metadata; we don't need to retain them forever. +# Use a conservative TTL so Redis state cannot grow without bound. +_TASK_TTL_SECONDS = 7 * 24 * 60 * 60 # 7 days + + def _task_key(task_id: str) -> str: """Return the Redis key for a task JSON payload.""" @@ -21,7 +26,11 @@ async def create_task(task: Task) -> None: """ redis = await get_redis_conn() - await redis.set(_task_key(task.id), task.model_dump_json()) + await redis.set( + _task_key(task.id), + task.model_dump_json(), + ex=_TASK_TTL_SECONDS, + ) async def get_task(task_id: str) -> Task | None: @@ -87,4 +96,4 @@ async def update_task_status( if task.created_at is None: task.created_at = datetime.now(UTC) - await redis.set(key, task.model_dump_json()) + await redis.set(key, task.model_dump_json(), ex=_TASK_TTL_SECONDS) diff --git a/docs/configuration.md b/docs/configuration.md index 4b2524f..352fe6c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -207,8 +207,12 @@ uv run agent-memory task-worker --concurrency 5 --redelivery-timeout 60 ## Supported Models ### Generation Models (OpenAI) -- `gpt-4o` - Latest GPT-4 Optimized (recommended) -- `gpt-4o-mini` - Faster, smaller GPT-4 (good for fast_model) +- `gpt-5.2-chat-latest` - Latest GPT-5.2 Chat snapshot used in ChatGPT (recommended when available) +- `gpt-5.1-chat-latest` - GPT-5.1 Chat snapshot (fast, chat-optimized) +- `gpt-5-mini` - Smaller GPT-5 model (good candidate for `FAST_MODEL`) +- `gpt-5-nano` - Smallest GPT-5 model (ultra fast, cost efficient) +- `gpt-4o` - GPT-4 Optimized (default in this project) +- `gpt-4o-mini` - Faster, smaller GPT-4 (good for `FAST_MODEL`) - `gpt-4` - Previous GPT-4 version - `gpt-3.5-turbo` - Older, faster model - `o1` - OpenAI o1 reasoning model diff --git a/docs/getting-started.md b/docs/getting-started.md index 0ad7be1..ba20f5d 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -69,7 +69,7 @@ When configuring MCP-enabled apps (e.g., Claude Desktop), prefer `uvx` so the ap ``` Notes: -- API keys: Default models use OpenAI. Set `OPENAI_API_KEY`. To use Anthropic instead, set `ANTHROPIC_API_KEY` and also `GENERATION_MODEL` to an Anthropic model (e.g., `claude-3-5-haiku-20241022`). +- API keys: Default models use OpenAI. Set `OPENAI_API_KEY`. To use Anthropic instead, set `ANTHROPIC_API_KEY` and also `GENERATION_MODEL` to an Anthropic model (e.g., `claude-3-5-haiku-20241022`). If you have access to GPT-5 models, you can instead set `GENERATION_MODEL` to `gpt-5.2-chat-latest`, `gpt-5.1-chat-latest`, `gpt-5-mini`, or `gpt-5-nano`. - Make sure your MCP host can find `uvx` (on its PATH or by using an absolute command path). macOS: `brew install uv`. If not on PATH, set `"command"` to an absolute path (e.g., `/opt/homebrew/bin/uvx` on Apple Silicon, `/usr/local/bin/uvx` on Intel macOS). - For production, remove `DISABLE_AUTH` and configure auth. diff --git a/docs/query-optimization.md b/docs/query-optimization.md index 3bac790..1514463 100644 --- a/docs/query-optimization.md +++ b/docs/query-optimization.md @@ -64,10 +64,11 @@ QUERY_OPTIMIZATION_MODEL=gpt-4o-mini # Use a more powerful model for memory extraction and other tasks GENERATION_MODEL=gpt-4o -# Supported models include: -# - gpt-4o, gpt-4o-mini -# - claude-3-5-sonnet-20241022, claude-3-haiku-20240307 -# - Any model supported by your LLM provider + # Supported models include: + # - gpt-5.2-chat-latest, gpt-5.1-chat-latest, gpt-5-mini, gpt-5-nano + # - gpt-4o, gpt-4o-mini + # - claude-3-5-sonnet-20241022, claude-3-haiku-20240307 + # - Any model supported by your LLM provider ``` ## Usage Examples diff --git a/tests/test_summary_views.py b/tests/test_summary_views.py index 07f1f9c..f248c09 100644 --- a/tests/test_summary_views.py +++ b/tests/test_summary_views.py @@ -29,6 +29,28 @@ async def test_create_and_get_summary_view(client): assert fetched["group_by"] == ["user_id"] +@pytest.mark.asyncio +async def test_create_summary_view_rejects_invalid_keys(client): + """SummaryView creation should reject unsupported group_by / filter keys.""" + + payload = { + "name": "invalid_keys_view", + "source": "long_term", + # "invalid" is not in the allowed group_by set + "group_by": ["user_id", "invalid"], + "filters": {"memory_type": "semantic"}, + "time_window_days": 30, + "continuous": False, + "prompt": None, + "model_name": None, + } + + resp = await client.post("/v1/summary-views", json=payload) + assert resp.status_code == 400 + data = resp.json() + assert "Unsupported group_by fields" in data["detail"] + + @pytest.mark.asyncio async def test_run_single_partition_and_list_partitions(client): # Create a simple view grouped by user_id @@ -66,6 +88,46 @@ async def test_run_single_partition_and_list_partitions(client): assert partitions[0]["group"]["user_id"] == "alice" +@pytest.mark.asyncio +async def test_delete_summary_view_removes_it_from_get_and_list(client): + """Deleting a SummaryView should remove it from retrieval and listings.""" + + # Create a view we can delete + payload = { + "name": "ltm_to_delete", + "source": "long_term", + "group_by": ["user_id"], + "filters": {}, + "time_window_days": None, + "continuous": False, + "prompt": None, + "model_name": None, + } + resp = await client.post("/v1/summary-views", json=payload) + assert resp.status_code == 200, resp.text + view_id = resp.json()["id"] + + # Ensure it appears in the list + list_before = await client.get("/v1/summary-views") + assert list_before.status_code == 200 + ids_before = {v["id"] for v in list_before.json()} + assert view_id in ids_before + + # Delete the view + resp_delete = await client.delete(f"/v1/summary-views/{view_id}") + assert resp_delete.status_code == 200, resp_delete.text + + # GET should now return 404 + resp_get = await client.get(f"/v1/summary-views/{view_id}") + assert resp_get.status_code == 404 + + # And it should no longer appear in the list + list_after = await client.get("/v1/summary-views") + assert list_after.status_code == 200 + ids_after = {v["id"] for v in list_after.json()} + assert view_id not in ids_after + + @pytest.mark.asyncio async def test_run_full_view_creates_task_and_updates_status(client): # Create a summary view @@ -89,7 +151,10 @@ async def test_run_full_view_creates_task_and_updates_status(client): task = resp_run.json() task_id = task["id"] - # Poll the task status via the API + # Poll the task status via the API. We intentionally do not wait for the + # background Docket worker here; the goal is to verify that the Task is + # created and visible through the status endpoint, not that the worker + # has actually completed the refresh. resp_task = await client.get(f"/v1/tasks/{task_id}") assert resp_task.status_code == 200 polled = resp_task.json() From 598ffd9a104607dde96dc699a17fc8af22f08336 Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Fri, 19 Dec 2025 12:41:31 -0800 Subject: [PATCH 4/5] Tidy RunSummaryViewRequest task_id description --- agent_memory_server/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent_memory_server/models.py b/agent_memory_server/models.py index f36d06e..1f6564e 100644 --- a/agent_memory_server/models.py +++ b/agent_memory_server/models.py @@ -1070,6 +1070,6 @@ class RunSummaryViewRequest(BaseModel): task_id: str | None = Field( default=None, description=( - "Optional client-provided task ID. If omitted, the server " "generates one." + "Optional client-provided task ID. If omitted, the server generates one." ), ) From 65e779bf1689aa9e1e3c6f53051deeb2d656c6a5 Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Fri, 19 Dec 2025 12:59:02 -0800 Subject: [PATCH 5/5] Paginate SummaryView long-term memory fetches --- agent_memory_server/summary_views.py | 63 +++++++++++++++------- tests/test_summary_views.py | 81 +++++++++++++++++++++++++++- 2 files changed, 123 insertions(+), 21 deletions(-) diff --git a/agent_memory_server/summary_views.py b/agent_memory_server/summary_views.py index 0fac2c7..8131872 100644 --- a/agent_memory_server/summary_views.py +++ b/agent_memory_server/summary_views.py @@ -225,35 +225,58 @@ def _apply_filter(key: str, value: str | Any) -> None: async def _fetch_long_term_memories_for_view( view: SummaryView, extra_group: dict[str, str] | None = None, - limit: int = 1000, + page_size: int = 1000, + overall_limit: int | None = None, ) -> list[MemoryRecord]: """Fetch long-term memories matching a SummaryView and optional group. Uses the filter-only listing path of search_long_term_memories by - providing an empty text query. + providing an empty text query and paginating through results. + + If overall_limit is provided, it serves as an upper bound on the total + number of memories returned; otherwise, all available pages are fetched. """ + if page_size <= 0: + raise ValueError("page_size must be positive") + filters = _build_long_term_filters_for_view(view, extra_group) - results = await long_term_memory.search_long_term_memories( - text="", - limit=limit, - offset=0, - **filters, - ) - memories = list(results.memories) - - # If we hit the page limit, log a warning so potential truncation does - # not go unnoticed. We keep the single-page behavior for now to avoid - # unbounded scans but make it observable. - if len(memories) >= limit: - logger.warning( - "_fetch_long_term_memories_for_view fetched %d memories for view %s; " - "results may be truncated due to the current limit=%d.", - len(memories), - view.id, - limit, + memories: list[MemoryRecord] = [] + offset = 0 + + while True: + # Respect an overall cap if provided + if overall_limit is not None: + remaining = overall_limit - len(memories) + if remaining <= 0: + break + current_limit = min(page_size, remaining) + else: + current_limit = page_size + + results = await long_term_memory.search_long_term_memories( + text="", + limit=current_limit, + offset=offset, + **filters, ) + batch = list(results.memories) + if not batch: + break + + memories.extend(batch) + + # If fewer results than requested were returned, we've reached the + # end of the result set. + if len(batch) < current_limit: + break + + offset += len(batch) + + # If we applied an overall limit, enforce it defensively here too. + if overall_limit is not None and len(memories) > overall_limit: + memories = memories[:overall_limit] return memories diff --git a/tests/test_summary_views.py b/tests/test_summary_views.py index f248c09..48c462d 100644 --- a/tests/test_summary_views.py +++ b/tests/test_summary_views.py @@ -1,6 +1,6 @@ import pytest -from agent_memory_server.models import TaskStatusEnum +from agent_memory_server.models import MemoryRecord, SummaryView, TaskStatusEnum @pytest.mark.asyncio @@ -163,3 +163,82 @@ async def test_run_full_view_creates_task_and_updates_status(client): TaskStatusEnum.RUNNING, TaskStatusEnum.SUCCESS, } + + +@pytest.mark.asyncio +async def test_fetch_long_term_memories_for_view_paginates(monkeypatch): + """_fetch_long_term_memories_for_view should paginate through results. + + We monkeypatch long_term_memory.search_long_term_memories to return + deterministic pages and verify that multiple calls are made when the + number of results exceeds the configured page_size. + """ + + from agent_memory_server import summary_views + + calls: list[tuple[int, int]] = [] + + class FakeResults: + def __init__(self, memories: list[MemoryRecord]): + self.memories = memories + + async def fake_search_long_term_memories( + *, text: str, limit: int, offset: int, **_: object + ): # type: ignore[override] + # Record the (limit, offset) pair for assertions. + calls.append((limit, offset)) + + # Pretend we have 2500 total memories; each page returns `limit` + # until we reach that total. + total = 2500 + remaining = max(total - offset, 0) + batch_size = min(limit, remaining) + + memories = [ + MemoryRecord( + id=f"mem-{offset + i}", + text=f"memory {offset + i}", + session_id=None, + user_id=None, + namespace=None, + ) + for i in range(batch_size) + ] + return FakeResults(memories) + + monkeypatch.setattr( + summary_views.long_term_memory, + "search_long_term_memories", + fake_search_long_term_memories, + ) + + view = SummaryView( + id="view-1", + name="test", + source="long_term", + group_by=["user_id"], + filters={}, + time_window_days=None, + continuous=False, + prompt=None, + model_name=None, + ) + + # Use a small page_size so multiple pages are required; also set + # an overall_limit below the total so we exercise that branch. + memories = await summary_views._fetch_long_term_memories_for_view( + view, + extra_group=None, + page_size=1000, + overall_limit=2100, + ) + + # We should have respected the overall_limit. + assert len(memories) == 2100 + + # And we should have made at least two paginated calls with advancing + # offsets. + assert calls[0] == (1000, 0) + assert calls[1] == (1000, 1000) + # The final page only needs 100 records to reach 2100. + assert calls[2] == (100, 2000)