Skip to content

Conversation

@abrookins
Copy link
Collaborator

@abrookins abrookins commented Dec 17, 2025

What are Summary Views?

Summary Views let you define named, reusable, continuous summaries over your long-term memory store. They're basically materialized summaries.

Instead of re-querying and re-summarizing raw memories every time, you:

  • Describe what to summarize (filters + time window)
  • Describe how to group those memories (e.g. per user_id)
  • Retrieve a pre-computed summary within the Summary View

The server stores the resulting summaries, so you can fetch them quickly after they're refreshed.


When should I use this?

Use Summary Views when you want:

  • A stable, periodically refreshed summary of:
    • Each user's interactions with an agent
    • All memories in a namespace
    • Or other simple groupings (user_id and memory_type).

Example use cases (all continually refreshed):

  • Store a user profile summarizing all preferences, behavior, and major life events of each user
  • Store the last 30 days of action items extracted from a team's interactions

How do I use it?

1. Create a Summary View

Call:

  • POST /v1/summary-views

Body (example):

{
  "name": "ltm_by_user_30d",
  "source": "long_term",
  "group_by": ["user_id"],
  "filters": { "memory_type": "semantic" },
  "time_window_days": 30,
  "continuous": false
}

Key ideas:

  • group_by says how to slice the data (here: one partition per user_id).
  • filters narrow the data (here: only memory_type = semantic).
  • time_window_days limits to a recent window (e.g. last 30 days).
  • continuous controls whether the server refreshes this view periodically in the background.

The response includes an id, which you’ll use for everything else.

NOTE: For now, summary views will be refreshed once per hour, and no control is exposed to change when or how often the refresh happens.


2. Refresh a single group

If you want to refresh a single summary for one group the Summary View represents, use:

  • POST /v1/summary-views/{view_id}/partitions/run

Body (example):

{ "group": { "user_id": "alice" } }

You get back a SummaryViewPartitionResult:

  • The group (e.g. { "user_id": "alice" }),
  • The summary text,
  • memory_count and computed_at.

This is useful if you need to generate a summary right now for one user.


3. Run the full view (async)

If you want to refresh all groups (e.g. all users) in a view:

  • POST /v1/summary-views/{view_id}/run

Body can be {}; the server returns a Task:

  • id: the task ID to poll
  • status: pending → running → success (or failed)

Then:

  • GET /v1/tasks/{task_id} to check status.

When it’s success, the view’s partition summaries are updated.
This is what you’d use for batch refreshes (or let the periodic job run if continuous=true).


4. Read the summaries

To read the stored summaries (without recomputing them):

  • GET /v1/summary-views/{view_id}/partitions

Optional query params to filter by group fields:

  • user_id
  • namespace
  • session_id
  • memory_type

Example:

  • GET /v1/summary-views/{view_id}/partitions?user_id=alice
    → returns any partition results for user_id=alice.

This is the endpoint you’d typically call from your app at request time: fast, read-only access to the latest materialized summaries.


5. Managing views

Basic management:

  • GET /v1/summary-views – list all views.
  • GET /v1/summary-views/{view_id} – fetch one view’s definition.
  • DELETE /v1/summary-views/{view_id} – delete a view definition
    (existing summaries are not currently cleaned up automatically).

Mental model

You can think of a Summary View as:

A named “materialized summary table” over long-term memory, grouped by some key (like user), that you can:

  • define once,
  • periodically refresh,
  • and read from quickly.

You decide:

  • The slice of memory (filters + time window),
  • The partitioning (group_by),
  • And optionally the prompt/model.

The server handles:

  • Querying memories,
  • Summarizing per group,
  • Storing results,
  • And exposing simple HTTP endpoints to create, run, and read views.

Copilot AI review requested due to automatic review settings December 17, 2025 01:36
@abrookins abrookins changed the title Summary views New Feature: Summary Views Dec 17, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces Summary Views, a new feature for creating named, reusable, continuous summaries over long-term memory. Summary Views allow users to define how to filter, group, and periodically refresh summaries of their memory data, providing pre-computed summaries that can be retrieved quickly without re-querying and re-summarizing raw memories each time.

Key Changes

  • Adds Task and SummaryView models with supporting request/response types
  • Implements Redis-based storage for summary view configurations and partition results
  • Provides API endpoints for CRUD operations, partition execution, and task status tracking
  • Integrates periodic background refresh for continuous summary views via Docket

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
agent_memory_server/models.py Adds Task, SummaryView, and related model definitions; removes unused MCP imports
agent_memory_server/tasks.py Implements task management (create, get, update status) with Redis storage
agent_memory_server/summary_views.py Core summary view logic: storage, retrieval, partitioning, summarization, and periodic refresh
agent_memory_server/api.py REST API endpoints for summary view CRUD, partition execution, and task status
agent_memory_server/docket_tasks.py Registers summary view tasks with Docket for background execution
tests/test_summary_views.py Basic integration tests covering view creation, partition execution, and task tracking

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 310 to 315
memories_text = "\n".join(f"- {m.text}" for m in memories)
prompt = (
f"{instructions}\n\n"
f"GROUP: {json.dumps(group, sort_keys=True)}\n\n"
f"MEMORIES:\n{memories_text}\n\nSUMMARY:"
)
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All memories are joined without limit into a single prompt string. If a partition contains many memories (up to the 1000 limit from _fetch_long_term_memories_for_view), this could result in an extremely large prompt that exceeds LLM token limits. Consider truncating the memories list or implementing chunked summarization for large memory sets.

Copilot uses AI. Check for mistakes.

# We use the same interface pattern as other summarization helpers.
response = await client.create_chat_completion(model_name, prompt)
summary_text = response.choices[0].message.content
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code directly accesses response.choices[0].message.content without checking if choices is non-empty or if content is not None. While the LLM client wrappers generally ensure choices is populated, add defensive checks to handle edge cases where the response might be empty or malformed to prevent IndexError or AttributeError.

Copilot uses AI. Check for mistakes.
Comment on lines 52 to 63
def encode_partition_key(group: dict[str, str]) -> str:
"""Create a stable key representation from group_by values.
Keys are sorted alphabetically so the same group always produces the
same identifier.
"""

parts: list[str] = []
for key in sorted(group.keys()):
value = group[key]
parts.append(f"{key}={value}")
return "|".join(parts)
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The encode_partition_key function does not escape special characters in group values. If a group value contains the delimiter characters '=' or '|', it could cause ambiguous keys or collisions. Consider URL-encoding or escaping these special characters in the values to ensure uniqueness.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 27 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 1 to 100
import pytest

from agent_memory_server.models import TaskStatusEnum


@pytest.mark.asyncio
async def test_create_and_get_summary_view(client):
# Create a summary view
payload = {
"name": "ltm_by_user_30d",
"source": "long_term",
"group_by": ["user_id"],
"filters": {"memory_type": "semantic"},
"time_window_days": 30,
"continuous": False,
"prompt": None,
"model_name": None,
}
resp = await client.post("/v1/summary-views", json=payload)
assert resp.status_code == 200, resp.text
view = resp.json()
view_id = view["id"]

# Fetch it back
resp_get = await client.get(f"/v1/summary-views/{view_id}")
assert resp_get.status_code == 200
fetched = resp_get.json()
assert fetched["id"] == view_id
assert fetched["group_by"] == ["user_id"]


@pytest.mark.asyncio
async def test_run_single_partition_and_list_partitions(client):
# Create a simple view grouped by user_id
payload = {
"name": "ltm_by_user",
"source": "long_term",
"group_by": ["user_id"],
"filters": {},
"time_window_days": None,
"continuous": False,
"prompt": None,
"model_name": None,
}
resp = await client.post("/v1/summary-views", json=payload)
assert resp.status_code == 200, resp.text
view_id = resp.json()["id"]

# Run a single partition synchronously
run_payload = {"group": {"user_id": "alice"}}
resp_run = await client.post(
f"/v1/summary-views/{view_id}/partitions/run", json=run_payload
)
assert resp_run.status_code == 200, resp_run.text
result = resp_run.json()
assert result["group"] == {"user_id": "alice"}
assert "summary" in result

# List materialized partitions
resp_list = await client.get(
f"/v1/summary-views/{view_id}/partitions", params={"user_id": "alice"}
)
assert resp_list.status_code == 200
partitions = resp_list.json()
assert len(partitions) == 1
assert partitions[0]["group"]["user_id"] == "alice"


@pytest.mark.asyncio
async def test_run_full_view_creates_task_and_updates_status(client):
# Create a summary view
payload = {
"name": "ltm_full_run",
"source": "long_term",
"group_by": ["user_id"],
"filters": {},
"time_window_days": None,
"continuous": False,
"prompt": None,
"model_name": None,
}
resp = await client.post("/v1/summary-views", json=payload)
assert resp.status_code == 200, resp.text
view_id = resp.json()["id"]

# Trigger a full run
resp_run = await client.post(f"/v1/summary-views/{view_id}/run", json={})
assert resp_run.status_code == 200, resp_run.text
task = resp_run.json()
task_id = task["id"]

# Poll the task status via the API
resp_task = await client.get(f"/v1/tasks/{task_id}")
assert resp_task.status_code == 200
polled = resp_task.json()
assert polled["status"] in {
TaskStatusEnum.PENDING,
TaskStatusEnum.RUNNING,
TaskStatusEnum.SUCCESS,
}
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests verify that invalid group_by or filter keys are rejected. The validation logic in _validate_summary_view_keys restricts keys to a specific set, but there are no tests verifying that invalid keys (e.g., "invalid_field") are properly rejected with appropriate error messages. Add tests for these validation scenarios.

Copilot uses AI. Check for mistakes.
Comment on lines 188 to 209
# Static filters from the view config
for key, value in view.filters.items():
if key == "user_id":
filters["user_id"] = UserId(eq=str(value))
elif key == "namespace":
filters["namespace"] = Namespace(eq=str(value))
elif key == "session_id":
filters["session_id"] = SessionId(eq=str(value))
elif key == "memory_type":
filters["memory_type"] = MemoryType(eq=str(value))

# Group-specific filters
if extra_group:
for key, value in extra_group.items():
if key == "user_id":
filters["user_id"] = UserId(eq=value)
elif key == "namespace":
filters["namespace"] = Namespace(eq=value)
elif key == "session_id":
filters["session_id"] = SessionId(eq=value)
elif key == "memory_type":
filters["memory_type"] = MemoryType(eq=value)
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same filter key processing logic is duplicated for view.filters and extra_group. This code duplication makes it harder to maintain and increases the risk of bugs if the logic needs to change. Consider extracting this into a helper function that processes a single dict of key-value pairs into filter objects.

Copilot uses AI. Check for mistakes.
Comment on lines 459 to 467
except Exception as exc: # noqa: BLE001
logger.exception("Error refreshing SummaryView %s", view_id)
if task_id is not None:
await update_task_status(
task_id,
status=TaskStatusEnum.FAILED,
completed_at=datetime.now(UTC),
error_message=str(exc),
)
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a bare except Exception clause with noqa comment suppresses linting warnings without clear justification. While the exception is logged, this broad exception handling could hide unexpected errors that should be allowed to propagate. Consider catching more specific exceptions (e.g., Redis errors, model errors) or documenting why broad exception handling is necessary here.

Copilot uses AI. Check for mistakes.
Comment on lines 17 to 24
async def create_task(task: Task) -> None:
"""Persist a new Task as JSON in Redis.
This overwrites any existing task with the same ID.
"""

redis = await get_redis_conn()
await redis.set(_task_key(task.id), task.model_dump_json())
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks are stored in Redis indefinitely with no TTL or cleanup mechanism. Over time, this could lead to unbounded growth of task records in Redis. Consider adding a TTL when creating tasks (e.g., expire after 7 days) or implementing a periodic cleanup job to remove old completed/failed tasks.

Copilot uses AI. Check for mistakes.
Comment on lines 222 to 240
limit: int = 1000,
) -> list[MemoryRecord]:
"""Fetch long-term memories matching a SummaryView and optional group.
Uses the filter-only listing path of search_long_term_memories by
providing an empty text query.
"""

filters = _build_long_term_filters_for_view(view, extra_group)

results = await long_term_memory.search_long_term_memories(
text="",
limit=limit,
offset=0,
**filters,
)
return list(results.memories)


Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default limit of 1000 memories might be insufficient for comprehensive summaries. When a partition contains more than 1000 memories, only the first 1000 will be included, potentially skipping recent or important memories. This could lead to incomplete or stale summaries, especially for long-running views. Consider adding pagination logic to process all memories, or make the limit configurable via the SummaryView configuration.

Suggested change
limit: int = 1000,
) -> list[MemoryRecord]:
"""Fetch long-term memories matching a SummaryView and optional group.
Uses the filter-only listing path of search_long_term_memories by
providing an empty text query.
"""
filters = _build_long_term_filters_for_view(view, extra_group)
results = await long_term_memory.search_long_term_memories(
text="",
limit=limit,
offset=0,
**filters,
)
return list(results.memories)
overall_limit: int | None = None,
) -> list[MemoryRecord]:
"""Fetch long-term memories matching a SummaryView and optional group.
Uses the filter-only listing path of search_long_term_memories by
providing an empty text query.
This function paginates through all matching memories to avoid silently
truncating results when more than a single page exists. If overall_limit
is provided, it serves as an upper bound on the total number of memories
returned; otherwise, all available memories are fetched.
"""
filters = _build_long_term_filters_for_view(view, extra_group)
page_size = 1000
memories: list[MemoryRecord] = []
offset = 0
while True:
if overall_limit is not None:
remaining = overall_limit - len(memories)
if remaining <= 0:
break
current_limit = remaining if remaining < page_size else page_size
else:
current_limit = page_size
results = await long_term_memory.search_long_term_memories(
text="",
limit=current_limit,
offset=offset,
**filters,
)
batch = list(results.memories)
if not batch:
break
memories.extend(batch)
# If fewer results than requested were returned, we've reached the end.
if len(batch) < current_limit:
break
offset += len(batch)
if overall_limit is not None:
return memories[:overall_limit]
return memories

Copilot uses AI. Check for mistakes.
Comment on lines 314 to 329
# Avoid constructing an excessively large prompt when many memories
# are present in a partition. We cap the memories used for the prompt
# but still report the full memory_count below.
memories_for_prompt = memories[:_MAX_MEMORIES_FOR_LLM_PROMPT]
memories_text = "\n".join(f"- {m.text}" for m in memories_for_prompt)
if len(memories) > _MAX_MEMORIES_FOR_LLM_PROMPT:
memories_text += (
f"\n\n[Truncated to first {_MAX_MEMORIES_FOR_LLM_PROMPT} "
f"memories out of {len(memories)}]"
)

prompt = (
f"{instructions}\n\n"
f"GROUP: {json.dumps(group, sort_keys=True)}\n\n"
f"MEMORIES:\n{memories_text}\n\nSUMMARY:"
)
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prompt construction doesn't account for total token limits. While memories are capped at 200 items, each memory's text field could be arbitrarily long. If many memories have very long text content, the prompt could exceed model context limits, causing the API call to fail. Consider adding token counting and truncating individual memory texts or the overall memories_text string to stay within safe limits.

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 8
"""Helpers for SummaryView configs, stored results, and execution stubs.
This module currently focuses on Redis JSON storage and key conventions.
Execution logic for summarizing memories will be expanded in follow-up
changes; for now, we provide minimal placeholder behavior so the API
surface is wired end-to-end.
"""

Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module docstring states "Execution logic for summarizing memories will be expanded in follow-up changes; for now, we provide minimal placeholder behavior" but the implementation actually includes full LLM-based summarization logic. This docstring is outdated and misleading. Update it to accurately reflect that the module contains complete summarization logic for long-term memory sources.

Suggested change
"""Helpers for SummaryView configs, stored results, and execution stubs.
This module currently focuses on Redis JSON storage and key conventions.
Execution logic for summarizing memories will be expanded in follow-up
changes; for now, we provide minimal placeholder behavior so the API
surface is wired end-to-end.
"""
"""Helpers for SummaryView configs, stored results, and summarization logic.
This module implements the execution logic for summarizing long-term memory
sources using LLMs, including Redis JSON storage, key conventions, and
partitioned summary management so the API surface is wired end-to-end.
"""

Copilot uses AI. Check for mistakes.
try:
if view.source == "long_term":
# Fetch all relevant memories and partition them.
memories = await _fetch_long_term_memories_for_view(view)
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit is hardcoded to 1000 but may be insufficient for comprehensive view refresh. When fetching all memories for view refresh, more than 1000 memories might exist across all partitions. This silently truncates results without logging or warning, potentially causing incomplete partitions to be computed. Consider adding pagination to fetch all memories, or at least log a warning when hitting the limit.

Suggested change
memories = await _fetch_long_term_memories_for_view(view)
memories = await _fetch_long_term_memories_for_view(view)
# If the underlying fetch uses a fixed limit (e.g., 1000), we may not
# have retrieved all matching memories. Log a warning so potential
# truncation does not go unnoticed.
if len(memories) >= 1000:
logger.warning(
"refresh_summary_view: fetched %d memories for view %s; "
"results may be truncated due to an underlying limit.",
len(memories),
view.id,
)

Copilot uses AI. Check for mistakes.
Comment on lines 92 to 100
# Poll the task status via the API
resp_task = await client.get(f"/v1/tasks/{task_id}")
assert resp_task.status_code == 200
polled = resp_task.json()
assert polled["status"] in {
TaskStatusEnum.PENDING,
TaskStatusEnum.RUNNING,
TaskStatusEnum.SUCCESS,
}
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test only validates that the status is one of pending, running, or success, but doesn't verify the task actually completes or check the final state. This means the test could pass even if the background task never actually runs or fails silently. Consider adding a wait/poll loop to ensure the task reaches a terminal state (SUCCESS or FAILED), or at minimum, add a comment explaining why we only check the initial status.

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 100
import pytest

from agent_memory_server.models import TaskStatusEnum


@pytest.mark.asyncio
async def test_create_and_get_summary_view(client):
# Create a summary view
payload = {
"name": "ltm_by_user_30d",
"source": "long_term",
"group_by": ["user_id"],
"filters": {"memory_type": "semantic"},
"time_window_days": 30,
"continuous": False,
"prompt": None,
"model_name": None,
}
resp = await client.post("/v1/summary-views", json=payload)
assert resp.status_code == 200, resp.text
view = resp.json()
view_id = view["id"]

# Fetch it back
resp_get = await client.get(f"/v1/summary-views/{view_id}")
assert resp_get.status_code == 200
fetched = resp_get.json()
assert fetched["id"] == view_id
assert fetched["group_by"] == ["user_id"]


@pytest.mark.asyncio
async def test_run_single_partition_and_list_partitions(client):
# Create a simple view grouped by user_id
payload = {
"name": "ltm_by_user",
"source": "long_term",
"group_by": ["user_id"],
"filters": {},
"time_window_days": None,
"continuous": False,
"prompt": None,
"model_name": None,
}
resp = await client.post("/v1/summary-views", json=payload)
assert resp.status_code == 200, resp.text
view_id = resp.json()["id"]

# Run a single partition synchronously
run_payload = {"group": {"user_id": "alice"}}
resp_run = await client.post(
f"/v1/summary-views/{view_id}/partitions/run", json=run_payload
)
assert resp_run.status_code == 200, resp_run.text
result = resp_run.json()
assert result["group"] == {"user_id": "alice"}
assert "summary" in result

# List materialized partitions
resp_list = await client.get(
f"/v1/summary-views/{view_id}/partitions", params={"user_id": "alice"}
)
assert resp_list.status_code == 200
partitions = resp_list.json()
assert len(partitions) == 1
assert partitions[0]["group"]["user_id"] == "alice"


@pytest.mark.asyncio
async def test_run_full_view_creates_task_and_updates_status(client):
# Create a summary view
payload = {
"name": "ltm_full_run",
"source": "long_term",
"group_by": ["user_id"],
"filters": {},
"time_window_days": None,
"continuous": False,
"prompt": None,
"model_name": None,
}
resp = await client.post("/v1/summary-views", json=payload)
assert resp.status_code == 200, resp.text
view_id = resp.json()["id"]

# Trigger a full run
resp_run = await client.post(f"/v1/summary-views/{view_id}/run", json={})
assert resp_run.status_code == 200, resp_run.text
task = resp_run.json()
task_id = task["id"]

# Poll the task status via the API
resp_task = await client.get(f"/v1/tasks/{task_id}")
assert resp_task.status_code == 200
polled = resp_task.json()
assert polled["status"] in {
TaskStatusEnum.PENDING,
TaskStatusEnum.RUNNING,
TaskStatusEnum.SUCCESS,
}
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests verify the delete_summary_view endpoint works correctly. This is a critical management operation that should be tested to ensure it properly removes the view configuration and updates the index. Add a test that creates a view, deletes it, and verifies it no longer appears in the list or can be retrieved.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants