Skip to content

Conversation

@kawayiYokami
Copy link
Contributor

@kawayiYokami kawayiYokami commented Dec 24, 2025

关联
#4148

Modifications / 改动点

本 PR 包含两部分改动:

1. 原始 Commit (b6f1c5c) - Agent 上下文压缩系统

核心文件:

  • 新增 astrbot/core/context_manager/ 模块(完整的上下文管理系统)
    • context_manager.py - 统一上下文压缩管理器
    • context_compressor.py - LLM 智能摘要压缩器
    • context_truncator.py - 对半砍截断器
    • token_counter.py - Token 粗算计数器
    • models.py - 数据模型定义
    • summary_prompt.md - LLM 摘要提示词模板
  • 新增 astrbot/core/agent/tools/todolist_tool.py - TodoList 内部工具(任务管理)
  • 修改 astrbot/core/agent/runners/tool_loop_agent_runner.py - TodoList 注入和资源限制提示
  • 修改 astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py - 集成上下文管理器到流水线
  • 修改 astrbot/core/config/default.py - 添加上下文管理相关配置项
  • 修改 astrbot/core/provider/manager.py - Provider 管理器增强
  • 修改 astrbot/core/astr_agent_context.py - 添加 todolist 字段

实现功能:

  • 智能上下文压缩:当 Token 使用率超过 82% 时自动触发
  • LLM 智能摘要:调用 LLM 对旧对话生成摘要,保留最新消息
  • 对半砍策略:当摘要后仍超限时,删除中间 50% 消息
  • 消息合并:合并连续的 user/assistant 消息
  • Tool Calls 清理:删除不成对的工具调用记录
  • 按数量截断:控制最终保留的消息数量
  • TodoList 工具:Agent 可自主管理任务计划
  • 资源限制提示:在工具调用时显示剩余次数

2. 本次修复 - 启用 LLM 智能摘要功能

核心文件:

  • 修改 astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py
    • _process_with_context_manager() 方法添加 provider 参数(第 366-394 行)
    • 调用时传入 provider 实例(第 481-486 行)

实现功能:

  • 修复了 LLM 智能摘要功能无法生效的问题
  • 使 ContextManager 能够调用 LLMSummaryCompressor 进行智能压缩

测试文件:

  • 新增 tests/core/test_context_manager_integration.py - 10 个集成测试用例

  • 修改 tests/core/test_context_manager.py - 适配新的摘要提示词

  • 新增 tests/core/TEST_SUMMARY.md - 完整测试报告

  • 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

测试结果:

============================== test session starts ===============================
collected 39 items

tests/core/test_context_manager.py::TestTokenCounter ..................... [29个]
tests/core/test_context_manager_integration.py::TestContextManager ....... [10个]

========================= 39 passed, 1 warning in 4.15s ==========================

测试覆盖:

  • ✅ Token 计数器测试(10 个)
  • ✅ 上下文截断器测试(6 个)
  • ✅ LLM 摘要压缩器测试(4 个)
  • ✅ 上下文管理器单元测试(9 个)
  • ✅ 完整流水线集成测试(10 个)

关键验证点:

  • ✅ LLM 智能摘要功能正常工作(超过 82% 阈值时触发)
  • ✅ Provider 参数正确传递到 ContextManager
  • ✅ 压缩后的上下文正确保存到数据库
  • ✅ 多阶段处理流程(Token统计 → 压缩 → 合并 → 截断)协同工作
  • ✅ 错误处理和回退机制正常

Checklist / 检查清单

  • 我的更改经过了良好的测试,并已在上方提供了"验证步骤"和"运行截图"
  • 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。
  • 我的更改没有引入恶意代码。

Summary by Sourcery

引入一个 V2 上下文管理系统,为智能体对话提供基于 LLM 的自动摘要;将内部 TodoList 工具和智能消息注入集成进智能体循环中;并在后端和控制台中贯通提供商的上下文窗口配置。

New Features:

  • 新增可插拔的上下文管理器模块,可根据模型上下文限制,利用 token 估算、LLM 摘要、对话内容折半、消息合并以及工具调用清理等策略,自动压缩对话历史。
  • 引入内部 TodoList 工具和智能体上下文支持,使智能体可以管理并查看任务计划,包括将 TodoList 和资源限制信息自动注入到用户消息中。
  • 自动从 LLM 元数据中推断并存储各提供商模型的 max_context_length,同时写入后端配置和控制台中的提供商创建界面。

Enhancements:

  • 更新智能体工具循环执行器以跟踪步骤计数,当达到工具调用上限时,使用智能消息合并/追加策略向用户注入提示信息。
  • 将新的上下文管理器集成到智能体处理流水线中,用可配置、感知 token 的处理替代简单截断,并确保内部工具始终可用。
  • 添加迁移辅助工具,为现有提供商回填 max_context_length 字段,并在源配置中持久化该字段。

Tests:

  • 为上下文管理器的 token 计数、截断、LLM 摘要,以及端到端流水线行为添加全面的单元和集成测试。
  • 添加覆盖 TodoList 注入行为以及智能体执行器中智能用户消息注入的测试。
Original summary in English

Summary by Sourcery

Introduce a V2 context management system with automatic LLM-based summarization for agent conversations, integrate internal TodoList tools and smart message injections into the agent loop, and wire provider context window configuration throughout the backend and dashboard.

New Features:

  • Add a pluggable context manager module that auto-compresses conversation history using token estimation, LLM summarization, halving, message merging, and tool-call cleanup based on model context limits.
  • Introduce internal TodoList tools and agent context support so agents can manage and see task plans, including automatic injection of TodoList and resource limit information into user messages.
  • Automatically infer and store model max_context_length from LLM metadata for providers in both backend configuration and dashboard provider creation UI.

Enhancements:

  • Update the agent tool loop runner to track step counts and inject user-facing guidance when tool call limits are reached using a smart message merge/append strategy.
  • Integrate the new context manager into the agent processing pipeline, replacing simple truncation with configurable token-aware processing and ensuring internal tools are always available.
  • Add migration helpers to backfill max_context_length on existing providers and persist the field in source configs.

Tests:

  • Add comprehensive unit and integration tests for the context manager’s token counting, truncation, LLM summarization, and end-to-end pipeline behavior.
  • Add tests covering TodoList injection behavior and smart user-message injection in the agent runner.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 6 个问题,并给出了一些整体层面的反馈:

  • ContextManager 会在第一条消息上打上 _needs_compression_initial_token_count 等内部标记,但之后从不清理;建议把这类“账本/元数据”从用户可见的消息负载中剥离出去(例如单独跟踪这些状态,或者在返回前移除这些 key),以避免向下游组件泄露实现细节。
  • model_context_limit-1 时,ContextManager.process 会直接短路返回,不仅跳过了基于 token 的压缩,也跳过了诸如连续消息合并、移除未配对 tool-call 等有用的清理步骤;建议拆分这些阶段,即便禁用了基于 token 的压缩,清理和截断流程仍然可以继续执行。
  • 自动从 LLM_METADATAS 填充 max_context_length 的逻辑在 update_providercreate_provider 中都各写了一遍;建议提取成一个小的辅助函数以减少重复,并降低未来修改这块行为时出错的概率。
给 AI Agent 的提示词
Please address the comments from this code review:

## Overall Comments
- ContextManager marks messages with internal flags like `_needs_compression` and `_initial_token_count` on the first message and never strips them out; consider keeping such bookkeeping out of the user-visible payload (e.g., track this state separately or remove the keys before returning) to avoid leaking implementation details to downstream components.
- When `model_context_limit` is `-1`, `ContextManager.process` short-circuits and skips not only token-based compression but also useful cleanups like consecutive-message merging and unpaired tool-call removal; consider splitting these phases so that cleanup and truncation can still run even when token-based compression is disabled.
- The logic to auto-fill `max_context_length` from `LLM_METADATAS` is duplicated in both `update_provider` and `create_provider`; extracting this into a small helper function would reduce repetition and make future changes to that behavior less error-prone.

## Individual Comments

### Comment 1
<location> `astrbot/core/context_manager/context_truncator.py:51-60` </location>
<code_context>
+
+        return result
+
+    def truncate_by_count(
+        self, messages: list[dict[str, Any]], max_messages: int
+    ) -> list[dict[str, Any]]:
+        """
+        按数量截断:只保留最近的X条消息
+
+        规则:
+        - 保留系统消息(如果存在)
+        - 保留最近的max_messages条消息
+
+        Args:
+            messages: 原始消息列表
+            max_messages: 最大保留消息数
+
+        Returns:
+            截断后的消息列表
+        """
+        if len(messages) <= max_messages:
+            return messages
+
+        # 分离系统消息和其他消息
+        system_msgs = [m for m in messages if m.get("role") == "system"]
+        other_msgs = [m for m in messages if m.get("role") != "system"]
+
+        # 保留最近的消息
+        kept_other = other_msgs[-(max_messages - len(system_msgs)) :]
+
+        return system_msgs + kept_other
</code_context>

<issue_to_address>
**issue (bug_risk):** truncate_by_count 在系统消息较多时可能返回超过 max_messages 的结果。

当 `len(system_msgs) >= max_messages` 时,`max_messages - len(system_msgs)` 为 0 或负数,因此 `other_msgs[-(max_messages - len(system_msgs)):]` 仍然会返回元素,最终列表长度可能超过 `max_messages`。为了遵守 `max_messages` 的约定,建议在这种情况下直接短路返回(例如 `return system_msgs[-max_messages:]`),并且仅在 `len(system_msgs) < max_messages` 时才计算 `kept_other`。
</issue_to_address>

### Comment 2
<location> `tests/agent/runners/test_todolist_injection.py:94-95` </location>
<code_context>
+        )
+
+
+class TestTodoListInjection:
+    """测试 TodoList 注入逻辑"""
+
+    def setup_method(self):
</code_context>

<issue_to_address>
**suggestion (testing):** 建议为 `TodoListAddTool``TodoListUpdateTool` 本身补充单元测试。

当前测试只覆盖了 todo 列表是如何被注入到消息流中的逻辑,并未覆盖这些工具是如何修改 `AstrAgentContext.todolist` 的行为。请补充测试以:
- 断言在列表为空和非空时的 ID 分配行为;
- 覆盖错误路径(新增时缺失/空的 `tasks`;更新时传入不存在的 `task_id`);
- 校验状态字段处理以及可选 description 更新。
这些测试可以放在类似 `tests/agent/tools/test_todolist_tool.py` 的独立模块中,用来防止工具本身的回归影响整个 todo 流程。

Suggested implementation:

```python
import pytest

from tests.agent.runners.test_todolist_injection import MockAstrAgentContext

# NOTE: Adjust the import path below to match the actual location of the tools.
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool  # type: ignore[import]


class TestTodoListAddTool:
    """TodoListAddTool 行为测试"""

    def setup_method(self):
        self.context = MockAstrAgentContext()
        # 确保每个测试都从干净的 todolist 开始
        self.context.todolist = []
        self.tool = TodoListAddTool()

    def _run_add_tool(self, **params):
        """
        封装工具调用,便于后续统一修改调用方式。

        根据实际实现,可能需要改成:
        - self.tool(self.context, **params)
        - self.tool.run(self.context, **params)
        - self.tool.invoke(params, self.context)
        等。
        """
        return self.tool.run(self.context, **params)

    def test_assign_ids_when_list_empty(self):
        """当 todolist 为空时,新增任务应从 ID=1 开始递增"""
        result = self._run_add_tool(
            tasks=[
                {"description": "task 1", "status": "todo"},
                {"description": "task 2", "status": "doing"},
            ]
        )

        # 校验上下文内的 todolist
        assert len(self.context.todolist) == 2
        assert self.context.todolist[0]["id"] == 1
        assert self.context.todolist[0]["description"] == "task 1"
        assert self.context.todolist[0]["status"] == "todo"

        assert self.context.todolist[1]["id"] == 2
        assert self.context.todolist[1]["description"] == "task 2"
        assert self.context.todolist[1]["status"] == "doing"

        # 如果工具有返回值,也一并校验
        assert result is None or isinstance(result, dict)

    def test_assign_ids_when_list_non_empty(self):
        """当 todolist 非空时,应在现有最大 ID 基础上递增"""
        # 预置已有任务
        self.context.todolist = [
            {"id": 1, "description": "existing 1", "status": "todo"},
            {"id": 3, "description": "existing 3", "status": "done"},
        ]

        self._run_add_tool(
            tasks=[
                {"description": "new 1", "status": "todo"},
                {"description": "new 2", "status": "todo"},
            ]
        )

        # 最大 ID 为 3,新任务应为 4,5
        assert len(self.context.todolist) == 4
        new_tasks = self.context.todolist[-2:]
        assert new_tasks[0]["id"] == 4
        assert new_tasks[0]["description"] == "new 1"
        assert new_tasks[0]["status"] == "todo"

        assert new_tasks[1]["id"] == 5
        assert new_tasks[1]["description"] == "new 2"
        assert new_tasks[1]["status"] == "todo"

    def test_add_tool_error_when_tasks_missing(self):
        """缺少 tasks 参数时应走错误分支"""
        with pytest.raises((ValueError, KeyError, TypeError)):
            # 不传 tasks
            self._run_add_tool()

    def test_add_tool_error_when_tasks_empty(self):
        """tasks 为空列表时应走错误分支"""
        with pytest.raises(ValueError):
            self._run_add_tool(tasks=[])


class TestTodoListUpdateTool:
    """TodoListUpdateTool 行为测试"""

    def setup_method(self):
        self.context = MockAstrAgentContext()
        self.context.todolist = [
            {"id": 1, "description": "task 1", "status": "todo"},
            {"id": 2, "description": "task 2", "status": "doing"},
        ]
        self.tool = TodoListUpdateTool()

    def _run_update_tool(self, **params):
        """
        封装工具调用,便于后续统一修改调用方式。

        根据实际实现,可能需要改成:
        - self.tool(self.context, **params)
        - self.tool.run(self.context, **params)
        - self.tool.invoke(params, self.context)
        等。
        """
        return self.tool.run(self.context, **params)

    def test_update_status_and_description(self):
        """可以同时更新任务状态和描述"""
        result = self._run_update_tool(
            task_id=1,
            status="done",
            description="task 1 updated",
        )

        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["status"] == "done"
        assert task["description"] == "task 1 updated"
        assert result is None or isinstance(result, dict)

    def test_update_only_status_keeps_description(self):
        """仅更新状态时应保留原 description 不变"""
        original_desc = self.context.todolist[1]["description"]

        self._run_update_tool(
            task_id=2,
            status="done",
        )

        task = next(t for t in self.context.todolist if t["id"] == 2)
        assert task["status"] == "done"
        # 未传 description,应保持原值
        assert task["description"] == original_desc

    def test_update_only_description_keeps_status(self):
        """仅更新描述时应保留原 status 不变"""
        original_status = self.context.todolist[0]["status"]

        self._run_update_tool(
            task_id=1,
            description="only description updated",
        )

        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["description"] == "only description updated"
        assert task["status"] == original_status

    def test_update_non_existent_task_id_raises(self):
        """更新不存在的 task_id 时应走错误分支"""
        with pytest.raises((ValueError, LookupError, KeyError)):
            self._run_update_tool(
                task_id=999,
                status="done",
            )

    @pytest.mark.parametrize("status", ["todo", "doing", "done"])
    def test_accepts_valid_status_values(self, status):
        """验证常见状态值可以被接受并写入"""
        self._run_update_tool(
            task_id=1,
            status=status,
        )
        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["status"] == status

```

1.`from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool` 的导入路径改成项目实际路径(例如 `astrbot.core.agent.tools.todolist` 等)。
2. 根据工具的真实接口,调整 `_run_add_tool``_run_update_tool` 中对工具的调用方式(目前假定为 `tool.run(context, **params)`)。如果工具是可调用对象或有 `invoke/__call__` 等,请相应替换。
3. 如果 `MockAstrAgentContext` 的定义不在 `tests.agent.runners.test_todolist_injection` 中,请将导入改为实际位置,或改用正式的 `AstrAgentContext` 构造测试用例。
4. 如工具在错误路径下不是通过抛异常,而是返回错误对象/状态码,请将 `pytest.raises(...)` 断言替换成对返回值的断言,以匹配实际实现。
</issue_to_address>

### Comment 3
<location> `tests/core/test_context_manager.py:193-202` </location>
<code_context>
+    def test_truncate_by_count_exceeds_limit(self):
</code_context>

<issue_to_address>
**issue (testing):**`truncate_by_count` 补充 `max_messages` 小于或等于系统消息数量时的测试用例。

当前测试没有覆盖 `max_messages` 小于或等于 `system` 消息数量这一情况,在这种情况下 `max_messages - len(system_msgs)` 可能为 0 或负数,切片逻辑很容易写错。请添加一个测试:构造多个 `system` 消息,并分别将 `max_messages` 设为 0、1 以及恰好等于系统消息数量,以验证该行为。
</issue_to_address>

### Comment 4
<location> `tests/core/test_context_manager_integration.py:303-312` </location>
<code_context>
+        """测试:LLM API调用失败时的错误处理"""
+        provider = MockProvider()
+
+        from astrbot.core.context_manager.context_compressor import LLMSummaryCompressor
+
+        compressor = LLMSummaryCompressor(provider, keep_recent=2)
+
+        messages = [{"role": "user", "content": f"Message {i}"} for i in range(10)]
+
+        with patch.object(
+            provider, "text_chat", side_effect=Exception("API Error")
+        ) as mock_call:
+            result = await compressor.compress(messages)
+
+            # API失败时应该返回原始消息
+            assert result == messages
+            mock_call.assert_called_once()
+
+
+if __name__ == "__main__":
+    pytest.main([__file__, "-v", "-s"])
</code_context>

<issue_to_address>
**nitpick (testing):** LLM 摘要相关测试与 `summary_prompt.md` 中的具体文案绑定得过于紧密,建议放宽断言条件。

目前有若干测试直接断言 instruction 和 summary 中的具体中文前缀(例如 `"请基于我们完整的对话记录"``"历史会话摘要"`),这会导致任何对 `summary_prompt.md` 文案的修改都容易破坏测试。更稳健的做法是继续断言结构行为(例如额外的用户指令消息和系统摘要消息存在),但放宽对内容的检查(例如只要求非空、有调用并插入 summary 工具的结果),或者把对精确字符串的检查集中封装在一个辅助函数中,这样调整 prompt 文案时,不必修改大量测试用例。
</issue_to_address>

### Comment 5
<location> `showcase_features.py:193` </location>
<code_context>
+# ============ ContextManager 模拟实现 ============
+
+
+class MockContextManager:
+    """模拟 ContextManager 的核心逻辑"""
+
</code_context>

<issue_to_address>
**issue (complexity):** 建议在这个演示脚本中,用对真实生产组件的轻量封装来替代重新实现的上下文管理、todo 列表和最大步数逻辑,以避免重复业务规则。

主要的复杂度问题在于,该脚本重新实现了核心逻辑(上下文管理、token 阈值、压缩、todo 列表注入、最大步数注入),而不是复用已有的、已经过测试的组件。这会制造一套平行的心智模型,并且后续很容易产生偏差。

你可以在保留全部演示行为的前提下,通过以下方式简化:

1. **使用真实的 ContextManager 替代 `MockContextManager`,当前文件只做简单封装**  
   保留本文件中的打印/日志逻辑,但在 token 计数和压缩决策上复用真正的实现。

   ```python
   # from astrbot.core.context_manager import ContextManager  # adjust import path

   class DemoContextManager:
       """Thin wrapper that only adds logging around the real ContextManager."""

       def __init__(self, model_context_limit: int, is_agent_mode: bool = False, provider=None):
           self.cm = ContextManager(
               model_context_limit=model_context_limit,
               is_agent_mode=is_agent_mode,
               provider=provider,
           )

       async def process_context(self, messages: list[dict]) -> list[dict]:
           total_tokens = self.cm.count_tokens(messages)
           usage_rate = total_tokens / self.cm.model_context_limit
           print(f"  初始Token数: {total_tokens}")
           print(f"  上下文限制: {self.cm.model_context_limit}")
           print(f"  使用率: {usage_rate:.2%}")
           print(f"  触发阈值: {self.cm.threshold:.0%}")

           # delegate real logic
           result = await self.cm.process_context(messages)

           print("\n【输出】压缩后的消息历史:")
           print_messages(result, indent=1)
           return result
   ```

   然后在 `demo_context_manager()` 中:

   ```python
   async def demo_context_manager():
       print_separator("DEMO 1: ContextManager Workflow")

       print_subsection("Agent模式(触发摘要压缩)")
       print("【输入】完整消息历史:")
       print_messages(LONG_MESSAGE_HISTORY, indent=1)

       print("\n【处理】执行 ContextManager.process_context (AGENT 模式):")
       mock_provider = MagicMock()
       cm_agent = DemoContextManager(
           model_context_limit=150, is_agent_mode=True, provider=mock_provider
       )
       result_agent = await cm_agent.process_context(LONG_MESSAGE_HISTORY)
   ```

   这样既保留了相同的打印说明,又去掉了重复的 token 计数、阈值和压缩策略逻辑。

2. **复用真实的 todo 列表注入/格式化逻辑**  
   不要再维护一套 `format_todolist` + `inject_todolist_to_messages` 的实现,而是调用核心 helper,只在外面包一层用于打印的封装。

   ```python
   # from astrbot.core.todo import inject_todolist, format_todolist  # adjust to real modules

   def demo_inject_todolist_to_messages(
       messages: list[dict],
       todolist: list[dict],
       max_tool_calls: int | None = None,
       current_tool_calls: int | None = None,
   ) -> list[dict]:
       """Thin demo wrapper adding logs around real injection."""
       print("\n【输入】TodoList:")
       for task in todolist:
           status_icon = {"pending": "[ ]", "in_progress": "[-]", "completed": "[x]"}.get(
               task["status"], "[ ]"
           )
           print(f"  {status_icon} #{task['id']}: {task['description']}")

       result = inject_todolist(
           messages=messages,
           todolist=todolist,
           max_tool_calls=max_tool_calls,
           current_tool_calls=current_tool_calls,
       )

       print("\n【输出】注入后的消息历史:")
       print_messages(result, indent=1)
       return result
   ```

   然后在 `demo_todolist_injection()` 中,把对 `inject_todolist_to_messages` 的调用替换为 `demo_inject_todolist_to_messages`,从而展示真实注入行为。

3. **集中处理最大步数的智能注入逻辑**  
   `demo_max_step_smart_injection` 中的逻辑本质上就是手写的 `_smart_inject_user_message`/“最大步数注入”行为。如果生产代码中已经存在类似逻辑,建议直接封装调用,而不是在此重复规则。

   ```python
   # from astrbot.core.agent_runner import smart_inject_user_message  # adjust import

   def demo_max_step_injection(messages: list[dict], max_step_message: str) -> list[dict]:
       """Use real smart-injection and only add console explanation."""
       print("\n【处理】模拟工具耗尽,执行智能注入逻辑...")
       result = smart_inject_user_message(messages, max_step_message)
       print("\n【输出】智能注入后的消息历史:")
       print_messages(result, indent=1)
       return result
   ```

   然后在 `demo_max_step_smart_injection()` 中:

   ```python
   result_a = demo_max_step_injection(messages_with_user, max_step_message)
   ...
   result_b = demo_max_step_injection(messages_with_tool, max_step_message)
   ```

4. **仅在本文件中保留演示相关的逻辑**  
   各种打印 helper(`print_separator``print_subsection``print_messages`)以及示例数据都可以保留在这里;关键的简化点是:**不要在这里重新表达业务规则**(token 统计、压缩、注入),而是用增加日志的方式去包装真实组件。

这些改动可以保持所有演示行为(以及可见输出)不变,同时消除平行实现,使脚本更易维护,并在核心逻辑未来变更时自动保持一致。
</issue_to_address>

### Comment 6
<location> `astrbot/core/context_manager/context_manager.py:63` </location>
<code_context>
+        if self.model_context_limit == -1:
+            return messages
+
+        # 阶段1:Token初始统计
+        messages = await self._initial_token_check(messages)
+
</code_context>

<issue_to_address>
**issue (complexity):** 建议通过在本地计算 token 使用情况并内联一些简单阶段,来简化主处理流程,避免在消息上存储内部标记并通过多个小包装方法“跳转”。

你可以保持整体流程不变,但避免通过修改 `messages[0]` 以隐藏 flag 的方式来存状态,并减少主路径上的间接调用。

### 1. 避免在消息上存 `_needs_compression``_initial_token_count`

相比把内部状态编码到第一条消息上,更推荐直接计算 token 使用情况,并通过局部变量传递。这样可以保持外部数据结构的干净,也能让 `_run_compression` 更容易理解。

重构示例(不改变行为):

```python
async def process(
    self, messages: list[dict[str, Any]], max_messages_to_keep: int = 20
) -> list[dict[str, Any]]:
    if self.model_context_limit == -1 or not messages:
        return messages

    total_tokens = self.token_counter.count_tokens(messages)
    usage_rate = total_tokens / self.model_context_limit
    needs_compression = usage_rate > self.threshold

    if needs_compression:
        messages = await self._run_compression(messages)

    messages = await self._run_final_processing(messages, max_messages_to_keep)
    return messages
```

然后可以简化 `_run_compression`,使其不再依赖隐藏标记:

```python
async def _run_compression(
    self, messages: list[dict[str, Any]]
) -> list[dict[str, Any]]:
    # Agent模式:先摘要,再判断
    messages = await self._compress_by_summarization(messages)

    tokens_after_summary = self.token_counter.count_tokens(messages)
    if tokens_after_summary / self.model_context_limit > self.threshold:
        messages = self._compress_by_halving(messages)

    return messages
```

这样就可以完全去掉 `_initial_token_check`,因为它的逻辑已被内联到 `process` 中并通过局部变量保存。

### 2. 内联一些简单阶段包装,减少跳转

如果你决定保留 `_run_final_processing`,也可以视情况合并一些非常简单的阶段包装函数,使主流程在一个函数中就能一眼看懂。

例如,可以把 `_run_final_processing` 逻辑内联到 `process`(保留现有的复杂 helper 方法):

```python
async def process(
    self, messages: list[dict[str, Any]], max_messages_to_keep: int = 20
) -> list[dict[str, Any]]:
    if self.model_context_limit == -1 or not messages:
        return messages

    total_tokens = self.token_counter.count_tokens(messages)
    usage_rate = total_tokens / self.model_context_limit
    needs_compression = usage_rate > self.threshold

    if needs_compression:
        messages = await self._run_compression(messages)

    # final processing steps are linear and small, so keep them visible here
    messages = self._merge_consecutive_messages(messages)
    messages = self._cleanup_unpaired_tool_calls(messages)
    messages = self.truncator.truncate_by_count(messages, max_messages_to_keep)

    return messages
```

这样可以把“token 检查 → 视情况压缩 → 末尾处理”这一概念流程集中在一个函数中,同时仍然复用现有的复杂 helper(`_merge_consecutive_messages``_cleanup_unpaired_tool_calls`)。
</issue_to_address>

Sourcery 对开源免费——如果你觉得我们的代码评审有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English

Hey - I've found 6 issues, and left some high level feedback:

  • ContextManager marks messages with internal flags like _needs_compression and _initial_token_count on the first message and never strips them out; consider keeping such bookkeeping out of the user-visible payload (e.g., track this state separately or remove the keys before returning) to avoid leaking implementation details to downstream components.
  • When model_context_limit is -1, ContextManager.process short-circuits and skips not only token-based compression but also useful cleanups like consecutive-message merging and unpaired tool-call removal; consider splitting these phases so that cleanup and truncation can still run even when token-based compression is disabled.
  • The logic to auto-fill max_context_length from LLM_METADATAS is duplicated in both update_provider and create_provider; extracting this into a small helper function would reduce repetition and make future changes to that behavior less error-prone.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- ContextManager marks messages with internal flags like `_needs_compression` and `_initial_token_count` on the first message and never strips them out; consider keeping such bookkeeping out of the user-visible payload (e.g., track this state separately or remove the keys before returning) to avoid leaking implementation details to downstream components.
- When `model_context_limit` is `-1`, `ContextManager.process` short-circuits and skips not only token-based compression but also useful cleanups like consecutive-message merging and unpaired tool-call removal; consider splitting these phases so that cleanup and truncation can still run even when token-based compression is disabled.
- The logic to auto-fill `max_context_length` from `LLM_METADATAS` is duplicated in both `update_provider` and `create_provider`; extracting this into a small helper function would reduce repetition and make future changes to that behavior less error-prone.

## Individual Comments

### Comment 1
<location> `astrbot/core/context_manager/context_truncator.py:51-60` </location>
<code_context>
+
+        return result
+
+    def truncate_by_count(
+        self, messages: list[dict[str, Any]], max_messages: int
+    ) -> list[dict[str, Any]]:
+        """
+        按数量截断:只保留最近的X条消息
+
+        规则:
+        - 保留系统消息(如果存在)
+        - 保留最近的max_messages条消息
+
+        Args:
+            messages: 原始消息列表
+            max_messages: 最大保留消息数
+
+        Returns:
+            截断后的消息列表
+        """
+        if len(messages) <= max_messages:
+            return messages
+
+        # 分离系统消息和其他消息
+        system_msgs = [m for m in messages if m.get("role") == "system"]
+        other_msgs = [m for m in messages if m.get("role") != "system"]
+
+        # 保留最近的消息
+        kept_other = other_msgs[-(max_messages - len(system_msgs)) :]
+
+        return system_msgs + kept_other
</code_context>

<issue_to_address>
**issue (bug_risk):** truncate_by_count can exceed max_messages when there are many system messages.

When `len(system_msgs) >= max_messages`, `max_messages - len(system_msgs)` is zero or negative, so `other_msgs[-(max_messages - len(system_msgs)):]` still returns elements and the final list can exceed `max_messages`. To preserve the `max_messages` contract, short-circuit in this case (e.g. `return system_msgs[-max_messages:]`) and only compute `kept_other` when `len(system_msgs) < max_messages`.
</issue_to_address>

### Comment 2
<location> `tests/agent/runners/test_todolist_injection.py:94-95` </location>
<code_context>
+        )
+
+
+class TestTodoListInjection:
+    """测试 TodoList 注入逻辑"""
+
+    def setup_method(self):
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding unit tests for `TodoListAddTool` and `TodoListUpdateTool` to cover the new tools themselves.

The current tests only cover how the todo list is injected into the message stream, not the behavior of the tools that mutate `AstrAgentContext.todolist`. Please add tests that:
- assert ID assignment when the list is empty vs. non‑empty,
- exercise error paths (`tasks` missing/empty for add; non‑existent `task_id` for update),
- validate status handling and optional description updates.
These can go in a dedicated module like `tests/agent/tools/test_todolist_tool.py` to protect the todo flow against regressions in the tools themselves.

Suggested implementation:

```python
import pytest

from tests.agent.runners.test_todolist_injection import MockAstrAgentContext

# NOTE: Adjust the import path below to match the actual location of the tools.
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool  # type: ignore[import]


class TestTodoListAddTool:
    """TodoListAddTool 行为测试"""

    def setup_method(self):
        self.context = MockAstrAgentContext()
        # 确保每个测试都从干净的 todolist 开始
        self.context.todolist = []
        self.tool = TodoListAddTool()

    def _run_add_tool(self, **params):
        """
        封装工具调用,便于后续统一修改调用方式。

        根据实际实现,可能需要改成:
        - self.tool(self.context, **params)
        - self.tool.run(self.context, **params)
        - self.tool.invoke(params, self.context)
        等。
        """
        return self.tool.run(self.context, **params)

    def test_assign_ids_when_list_empty(self):
        """当 todolist 为空时,新增任务应从 ID=1 开始递增"""
        result = self._run_add_tool(
            tasks=[
                {"description": "task 1", "status": "todo"},
                {"description": "task 2", "status": "doing"},
            ]
        )

        # 校验上下文内的 todolist
        assert len(self.context.todolist) == 2
        assert self.context.todolist[0]["id"] == 1
        assert self.context.todolist[0]["description"] == "task 1"
        assert self.context.todolist[0]["status"] == "todo"

        assert self.context.todolist[1]["id"] == 2
        assert self.context.todolist[1]["description"] == "task 2"
        assert self.context.todolist[1]["status"] == "doing"

        # 如果工具有返回值,也一并校验
        assert result is None or isinstance(result, dict)

    def test_assign_ids_when_list_non_empty(self):
        """当 todolist 非空时,应在现有最大 ID 基础上递增"""
        # 预置已有任务
        self.context.todolist = [
            {"id": 1, "description": "existing 1", "status": "todo"},
            {"id": 3, "description": "existing 3", "status": "done"},
        ]

        self._run_add_tool(
            tasks=[
                {"description": "new 1", "status": "todo"},
                {"description": "new 2", "status": "todo"},
            ]
        )

        # 最大 ID 为 3,新任务应为 4,5
        assert len(self.context.todolist) == 4
        new_tasks = self.context.todolist[-2:]
        assert new_tasks[0]["id"] == 4
        assert new_tasks[0]["description"] == "new 1"
        assert new_tasks[0]["status"] == "todo"

        assert new_tasks[1]["id"] == 5
        assert new_tasks[1]["description"] == "new 2"
        assert new_tasks[1]["status"] == "todo"

    def test_add_tool_error_when_tasks_missing(self):
        """缺少 tasks 参数时应走错误分支"""
        with pytest.raises((ValueError, KeyError, TypeError)):
            # 不传 tasks
            self._run_add_tool()

    def test_add_tool_error_when_tasks_empty(self):
        """tasks 为空列表时应走错误分支"""
        with pytest.raises(ValueError):
            self._run_add_tool(tasks=[])


class TestTodoListUpdateTool:
    """TodoListUpdateTool 行为测试"""

    def setup_method(self):
        self.context = MockAstrAgentContext()
        self.context.todolist = [
            {"id": 1, "description": "task 1", "status": "todo"},
            {"id": 2, "description": "task 2", "status": "doing"},
        ]
        self.tool = TodoListUpdateTool()

    def _run_update_tool(self, **params):
        """
        封装工具调用,便于后续统一修改调用方式。

        根据实际实现,可能需要改成:
        - self.tool(self.context, **params)
        - self.tool.run(self.context, **params)
        - self.tool.invoke(params, self.context)
        等。
        """
        return self.tool.run(self.context, **params)

    def test_update_status_and_description(self):
        """可以同时更新任务状态和描述"""
        result = self._run_update_tool(
            task_id=1,
            status="done",
            description="task 1 updated",
        )

        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["status"] == "done"
        assert task["description"] == "task 1 updated"
        assert result is None or isinstance(result, dict)

    def test_update_only_status_keeps_description(self):
        """仅更新状态时应保留原 description 不变"""
        original_desc = self.context.todolist[1]["description"]

        self._run_update_tool(
            task_id=2,
            status="done",
        )

        task = next(t for t in self.context.todolist if t["id"] == 2)
        assert task["status"] == "done"
        # 未传 description,应保持原值
        assert task["description"] == original_desc

    def test_update_only_description_keeps_status(self):
        """仅更新描述时应保留原 status 不变"""
        original_status = self.context.todolist[0]["status"]

        self._run_update_tool(
            task_id=1,
            description="only description updated",
        )

        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["description"] == "only description updated"
        assert task["status"] == original_status

    def test_update_non_existent_task_id_raises(self):
        """更新不存在的 task_id 时应走错误分支"""
        with pytest.raises((ValueError, LookupError, KeyError)):
            self._run_update_tool(
                task_id=999,
                status="done",
            )

    @pytest.mark.parametrize("status", ["todo", "doing", "done"])
    def test_accepts_valid_status_values(self, status):
        """验证常见状态值可以被接受并写入"""
        self._run_update_tool(
            task_id=1,
            status=status,
        )
        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["status"] == status

```

1.`from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool` 的导入路径改成项目实际路径(例如 `astrbot.core.agent.tools.todolist` 等)。
2. 根据工具的真实接口,调整 `_run_add_tool``_run_update_tool` 中对工具的调用方式(目前假定为 `tool.run(context, **params)`)。如果工具是可调用对象或有 `invoke/__call__` 等,请相应替换。
3. 如果 `MockAstrAgentContext` 的定义不在 `tests.agent.runners.test_todolist_injection` 中,请将导入改为实际位置,或改用正式的 `AstrAgentContext` 构造测试用例。
4. 如工具在错误路径下不是通过抛异常,而是返回错误对象/状态码,请将 `pytest.raises(...)` 断言替换成对返回值的断言,以匹配实际实现。
</issue_to_address>

### Comment 3
<location> `tests/core/test_context_manager.py:193-202` </location>
<code_context>
+    def test_truncate_by_count_exceeds_limit(self):
</code_context>

<issue_to_address>
**issue (testing):** Add a test for `truncate_by_count` when `max_messages` is less than or equal to the number of system messages.

Current tests don’t cover the case where `max_messages` is less than or equal to the number of `system` messages, where `max_messages - len(system_msgs)` can be zero or negative and slicing is easy to get wrong. Please add a test that creates multiple `system` messages and sets `max_messages` to 0, 1, and exactly the number of system messages to validate this behavior.
</issue_to_address>

### Comment 4
<location> `tests/core/test_context_manager_integration.py:303-312` </location>
<code_context>
+        """测试:LLM API调用失败时的错误处理"""
+        provider = MockProvider()
+
+        from astrbot.core.context_manager.context_compressor import LLMSummaryCompressor
+
+        compressor = LLMSummaryCompressor(provider, keep_recent=2)
+
+        messages = [{"role": "user", "content": f"Message {i}"} for i in range(10)]
+
+        with patch.object(
+            provider, "text_chat", side_effect=Exception("API Error")
+        ) as mock_call:
+            result = await compressor.compress(messages)
+
+            # API失败时应该返回原始消息
+            assert result == messages
+            mock_call.assert_called_once()
+
+
+if __name__ == "__main__":
+    pytest.main([__file__, "-v", "-s"])
</code_context>

<issue_to_address>
**nitpick (testing):** LLM summary tests are tightly coupled to the exact wording in `summary_prompt.md`; consider loosening assertions.

Several tests assert specific Chinese prefixes in the instruction and summary (e.g. `"请基于我们完整的对话记录"`, `"历史会话摘要"`), which makes them fragile to any wording change in `summary_prompt.md`. It’d be more robust to keep asserting the structural behavior (extra user instruction and system summary messages) while loosening the content checks (e.g., non‑empty, summary tool invoked and interpolated), or centralizing any exact-string checks behind a single helper so prompt edits don’t require touching many tests.
</issue_to_address>

### Comment 5
<location> `showcase_features.py:193` </location>
<code_context>
+# ============ ContextManager 模拟实现 ============
+
+
+class MockContextManager:
+    """模拟 ContextManager 的核心逻辑"""
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider replacing the reimplemented context, todo-list, and max-step logic in this demo with thin wrappers around the real production components to avoid duplicating business rules.

The main complexity issue is that this script re‑implements core logic (context management, token thresholds, compression, todo‑list injection, max‑step injection) instead of calling existing, tested components. That creates a parallel mental model and will drift over time.

You can keep all demo behavior but simplify by:

1. **Delegating to the real ContextManager instead of `MockContextManager`**  
   Keep the printing/logging in this file, but reuse the actual implementation for token counting and compression decisions.

   ```python
   # from astrbot.core.context_manager import ContextManager  # adjust import path

   class DemoContextManager:
       """Thin wrapper that only adds logging around the real ContextManager."""

       def __init__(self, model_context_limit: int, is_agent_mode: bool = False, provider=None):
           self.cm = ContextManager(
               model_context_limit=model_context_limit,
               is_agent_mode=is_agent_mode,
               provider=provider,
           )

       async def process_context(self, messages: list[dict]) -> list[dict]:
           total_tokens = self.cm.count_tokens(messages)
           usage_rate = total_tokens / self.cm.model_context_limit
           print(f"  初始Token数: {total_tokens}")
           print(f"  上下文限制: {self.cm.model_context_limit}")
           print(f"  使用率: {usage_rate:.2%}")
           print(f"  触发阈值: {self.cm.threshold:.0%}")

           # delegate real logic
           result = await self.cm.process_context(messages)

           print("\n【输出】压缩后的消息历史:")
           print_messages(result, indent=1)
           return result
   ```

   Then in `demo_context_manager()`:

   ```python
   async def demo_context_manager():
       print_separator("DEMO 1: ContextManager Workflow")

       print_subsection("Agent模式(触发摘要压缩)")
       print("【输入】完整消息历史:")
       print_messages(LONG_MESSAGE_HISTORY, indent=1)

       print("\n【处理】执行 ContextManager.process_context (AGENT 模式):")
       mock_provider = MagicMock()
       cm_agent = DemoContextManager(
           model_context_limit=150, is_agent_mode=True, provider=mock_provider
       )
       result_agent = await cm_agent.process_context(LONG_MESSAGE_HISTORY)
   ```

   This removes duplicated token counting, thresholds and compression strategy while keeping the same printed explanation.

2. **Reusing the real todo‑list injection/formatting logic**  
   Instead of `format_todolist` + `inject_todolist_to_messages` being a second implementation, call the core helpers and only wrap them with pretty printing.

   ```python
   # from astrbot.core.todo import inject_todolist, format_todolist  # adjust to real modules

   def demo_inject_todolist_to_messages(
       messages: list[dict],
       todolist: list[dict],
       max_tool_calls: int | None = None,
       current_tool_calls: int | None = None,
   ) -> list[dict]:
       """Thin demo wrapper adding logs around real injection."""
       print("\n【输入】TodoList:")
       for task in todolist:
           status_icon = {"pending": "[ ]", "in_progress": "[-]", "completed": "[x]"}.get(
               task["status"], "[ ]"
           )
           print(f"  {status_icon} #{task['id']}: {task['description']}")

       result = inject_todolist(
           messages=messages,
           todolist=todolist,
           max_tool_calls=max_tool_calls,
           current_tool_calls=current_tool_calls,
       )

       print("\n【输出】注入后的消息历史:")
       print_messages(result, indent=1)
       return result
   ```

   Then in `demo_todolist_injection()` replace calls to `inject_todolist_to_messages` with `demo_inject_todolist_to_messages`, so the real injection behavior is showcased.

3. **Centralizing max‑step smart injection**  
   The logic in `demo_max_step_smart_injection` is essentially a hand‑rolled `_smart_inject_user_message` / “max step injection”. If that exists in production code, wrap it instead of inlining the rules twice.

   ```python
   # from astrbot.core.agent_runner import smart_inject_user_message  # adjust import

   def demo_max_step_injection(messages: list[dict], max_step_message: str) -> list[dict]:
       """Use real smart-injection and only add console explanation."""
       print("\n【处理】模拟工具耗尽,执行智能注入逻辑...")
       result = smart_inject_user_message(messages, max_step_message)
       print("\n【输出】智能注入后的消息历史:")
       print_messages(result, indent=1)
       return result
   ```

   Then in `demo_max_step_smart_injection()`:

   ```python
   result_a = demo_max_step_injection(messages_with_user, max_step_message)
   ...
   result_b = demo_max_step_injection(messages_with_tool, max_step_message)
   ```

4. **Keep demo‑specific concerns only in this file**  
   All the printing helpers (`print_separator`, `print_subsection`, `print_messages`) and example data are fine here; the key simplification is to **stop re‑expressing business rules** (token accounting, compression, injection) and instead wrap the real components with extra logging.

These changes maintain all showcased behaviors (and visible output) but collapse the parallel implementation, making the script cheaper to maintain and automatically consistent with future core logic changes.
</issue_to_address>

### Comment 6
<location> `astrbot/core/context_manager/context_manager.py:63` </location>
<code_context>
+        if self.model_context_limit == -1:
+            return messages
+
+        # 阶段1:Token初始统计
+        messages = await self._initial_token_check(messages)
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider simplifying the main processing flow by computing token usage locally and inlining trivial stages instead of storing internal flags on messages and jumping through small wrapper methods.

You can keep the flow the same but avoid mutating `messages[0]` with hidden flags and reduce indirection in the main path.

### 1. Avoid `_needs_compression` and `_initial_token_count` on messages

Instead of encoding internal state into the first message, compute the token usage and pass it around as local variables. This keeps the external data structure clean and makes `_run_compression` easier to reason about.

Example refactor (no behavior change):

```python
async def process(
    self, messages: list[dict[str, Any]], max_messages_to_keep: int = 20
) -> list[dict[str, Any]]:
    if self.model_context_limit == -1 or not messages:
        return messages

    total_tokens = self.token_counter.count_tokens(messages)
    usage_rate = total_tokens / self.model_context_limit
    needs_compression = usage_rate > self.threshold

    if needs_compression:
        messages = await self._run_compression(messages)

    messages = await self._run_final_processing(messages, max_messages_to_keep)
    return messages
```

Then simplify `_run_compression` so it doesn’t need to inspect hidden flags:

```python
async def _run_compression(
    self, messages: list[dict[str, Any]]
) -> list[dict[str, Any]]:
    # Agent模式:先摘要,再判断
    messages = await self._compress_by_summarization(messages)

    tokens_after_summary = self.token_counter.count_tokens(messages)
    if tokens_after_summary / self.model_context_limit > self.threshold:
        messages = self._compress_by_halving(messages)

    return messages
```

You can then drop `_initial_token_check` entirely, since its logic is now inlined into `process` and held in local variables.

### 2. Inline trivial stage wrappers to reduce jumping

If you keep `_run_final_processing`, consider collapsing the very small stage wrappers where appropriate so the main flow is visible in one place.

For example, you can in-line `_run_final_processing` into `process` (keeping the existing helper methods):

```python
async def process(
    self, messages: list[dict[str, Any]], max_messages_to_keep: int = 20
) -> list[dict[str, Any]]:
    if self.model_context_limit == -1 or not messages:
        return messages

    total_tokens = self.token_counter.count_tokens(messages)
    usage_rate = total_tokens / self.model_context_limit
    needs_compression = usage_rate > self.threshold

    if needs_compression:
        messages = await self._run_compression(messages)

    # final processing steps are linear and small, so keep them visible here
    messages = self._merge_consecutive_messages(messages)
    messages = self._cleanup_unpaired_tool_calls(messages)
    messages = self.truncator.truncate_by_count(messages, max_messages_to_keep)

    return messages
```

This keeps the conceptual flow “token check → maybe compress → finalize” in a single function, while still reusing the more complex helpers (`_merge_consecutive_messages`, `_cleanup_unpaired_tool_calls`) as-is.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +51 to +60
def truncate_by_count(
self, messages: list[dict[str, Any]], max_messages: int
) -> list[dict[str, Any]]:
"""
按数量截断:只保留最近的X条消息

规则:
- 保留系统消息(如果存在)
- 保留最近的max_messages条消息

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): truncate_by_count 在系统消息较多时可能返回超过 max_messages 的结果。

len(system_msgs) >= max_messages 时,max_messages - len(system_msgs) 为 0 或负数,因此 other_msgs[-(max_messages - len(system_msgs)):] 仍然会返回元素,最终列表长度可能超过 max_messages。为了遵守 max_messages 的约定,建议在这种情况下直接短路返回(例如 return system_msgs[-max_messages:]),并且仅在 len(system_msgs) < max_messages 时才计算 kept_other

Original comment in English

issue (bug_risk): truncate_by_count can exceed max_messages when there are many system messages.

When len(system_msgs) >= max_messages, max_messages - len(system_msgs) is zero or negative, so other_msgs[-(max_messages - len(system_msgs)):] still returns elements and the final list can exceed max_messages. To preserve the max_messages contract, short-circuit in this case (e.g. return system_msgs[-max_messages:]) and only compute kept_other when len(system_msgs) < max_messages.

Comment on lines +94 to +95
class TestTodoListInjection:
"""测试 TodoList 注入逻辑"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 建议为 TodoListAddToolTodoListUpdateTool 本身补充单元测试。

当前测试只覆盖了 todo 列表是如何被注入到消息流中的逻辑,并未覆盖这些工具是如何修改 AstrAgentContext.todolist 的行为。请补充测试以:

  • 断言在列表为空和非空时的 ID 分配行为;
  • 覆盖错误路径(新增时缺失/空的 tasks;更新时传入不存在的 task_id);
  • 校验状态字段处理以及可选 description 更新。
    这些测试可以放在类似 tests/agent/tools/test_todolist_tool.py 的独立模块中,用来防止工具本身的回归影响整个 todo 流程。

Suggested implementation:

import pytest

from tests.agent.runners.test_todolist_injection import MockAstrAgentContext

# NOTE: Adjust the import path below to match the actual location of the tools.
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool  # type: ignore[import]


class TestTodoListAddTool:
    """TodoListAddTool 行为测试"""

    def setup_method(self):
        self.context = MockAstrAgentContext()
        # 确保每个测试都从干净的 todolist 开始
        self.context.todolist = []
        self.tool = TodoListAddTool()

    def _run_add_tool(self, **params):
        """
        封装工具调用,便于后续统一修改调用方式。

        根据实际实现,可能需要改成:
        - self.tool(self.context, **params)
        - self.tool.run(self.context, **params)
        - self.tool.invoke(params, self.context)
        等。
        """
        return self.tool.run(self.context, **params)

    def test_assign_ids_when_list_empty(self):
        """当 todolist 为空时,新增任务应从 ID=1 开始递增"""
        result = self._run_add_tool(
            tasks=[
                {"description": "task 1", "status": "todo"},
                {"description": "task 2", "status": "doing"},
            ]
        )

        # 校验上下文内的 todolist
        assert len(self.context.todolist) == 2
        assert self.context.todolist[0]["id"] == 1
        assert self.context.todolist[0]["description"] == "task 1"
        assert self.context.todolist[0]["status"] == "todo"

        assert self.context.todolist[1]["id"] == 2
        assert self.context.todolist[1]["description"] == "task 2"
        assert self.context.todolist[1]["status"] == "doing"

        # 如果工具有返回值,也一并校验
        assert result is None or isinstance(result, dict)

    def test_assign_ids_when_list_non_empty(self):
        """当 todolist 非空时,应在现有最大 ID 基础上递增"""
        # 预置已有任务
        self.context.todolist = [
            {"id": 1, "description": "existing 1", "status": "todo"},
            {"id": 3, "description": "existing 3", "status": "done"},
        ]

        self._run_add_tool(
            tasks=[
                {"description": "new 1", "status": "todo"},
                {"description": "new 2", "status": "todo"},
            ]
        )

        # 最大 ID 为 3,新任务应为 4,5
        assert len(self.context.todolist) == 4
        new_tasks = self.context.todolist[-2:]
        assert new_tasks[0]["id"] == 4
        assert new_tasks[0]["description"] == "new 1"
        assert new_tasks[0]["status"] == "todo"

        assert new_tasks[1]["id"] == 5
        assert new_tasks[1]["description"] == "new 2"
        assert new_tasks[1]["status"] == "todo"

    def test_add_tool_error_when_tasks_missing(self):
        """缺少 tasks 参数时应走错误分支"""
        with pytest.raises((ValueError, KeyError, TypeError)):
            # 不传 tasks
            self._run_add_tool()

    def test_add_tool_error_when_tasks_empty(self):
        """tasks 为空列表时应走错误分支"""
        with pytest.raises(ValueError):
            self._run_add_tool(tasks=[])


class TestTodoListUpdateTool:
    """TodoListUpdateTool 行为测试"""

    def setup_method(self):
        self.context = MockAstrAgentContext()
        self.context.todolist = [
            {"id": 1, "description": "task 1", "status": "todo"},
            {"id": 2, "description": "task 2", "status": "doing"},
        ]
        self.tool = TodoListUpdateTool()

    def _run_update_tool(self, **params):
        """
        封装工具调用,便于后续统一修改调用方式。

        根据实际实现,可能需要改成:
        - self.tool(self.context, **params)
        - self.tool.run(self.context, **params)
        - self.tool.invoke(params, self.context)
        等。
        """
        return self.tool.run(self.context, **params)

    def test_update_status_and_description(self):
        """可以同时更新任务状态和描述"""
        result = self._run_update_tool(
            task_id=1,
            status="done",
            description="task 1 updated",
        )

        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["status"] == "done"
        assert task["description"] == "task 1 updated"
        assert result is None or isinstance(result, dict)

    def test_update_only_status_keeps_description(self):
        """仅更新状态时应保留原 description 不变"""
        original_desc = self.context.todolist[1]["description"]

        self._run_update_tool(
            task_id=2,
            status="done",
        )

        task = next(t for t in self.context.todolist if t["id"] == 2)
        assert task["status"] == "done"
        # 未传 description,应保持原值
        assert task["description"] == original_desc

    def test_update_only_description_keeps_status(self):
        """仅更新描述时应保留原 status 不变"""
        original_status = self.context.todolist[0]["status"]

        self._run_update_tool(
            task_id=1,
            description="only description updated",
        )

        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["description"] == "only description updated"
        assert task["status"] == original_status

    def test_update_non_existent_task_id_raises(self):
        """更新不存在的 task_id 时应走错误分支"""
        with pytest.raises((ValueError, LookupError, KeyError)):
            self._run_update_tool(
                task_id=999,
                status="done",
            )

    @pytest.mark.parametrize("status", ["todo", "doing", "done"])
    def test_accepts_valid_status_values(self, status):
        """验证常见状态值可以被接受并写入"""
        self._run_update_tool(
            task_id=1,
            status=status,
        )
        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["status"] == status
  1. from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool 的导入路径改成项目实际路径(例如 astrbot.core.agent.tools.todolist 等)。
  2. 根据工具的真实接口,调整 _run_add_tool_run_update_tool 中对工具的调用方式(目前假定为 tool.run(context, **params))。如果工具是可调用对象或有 invoke/__call__ 等,请相应替换。
  3. 如果 MockAstrAgentContext 的定义不在 tests.agent.runners.test_todolist_injection 中,请将导入改为实际位置,或改用正式的 AstrAgentContext 构造测试用例。
  4. 如工具在错误路径下不是通过抛异常,而是返回错误对象/状态码,请将 pytest.raises(...) 断言替换成对返回值的断言,以匹配实际实现。
Original comment in English

suggestion (testing): Consider adding unit tests for TodoListAddTool and TodoListUpdateTool to cover the new tools themselves.

The current tests only cover how the todo list is injected into the message stream, not the behavior of the tools that mutate AstrAgentContext.todolist. Please add tests that:

  • assert ID assignment when the list is empty vs. non‑empty,
  • exercise error paths (tasks missing/empty for add; non‑existent task_id for update),
  • validate status handling and optional description updates.
    These can go in a dedicated module like tests/agent/tools/test_todolist_tool.py to protect the todo flow against regressions in the tools themselves.

Suggested implementation:

import pytest

from tests.agent.runners.test_todolist_injection import MockAstrAgentContext

# NOTE: Adjust the import path below to match the actual location of the tools.
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool  # type: ignore[import]


class TestTodoListAddTool:
    """TodoListAddTool 行为测试"""

    def setup_method(self):
        self.context = MockAstrAgentContext()
        # 确保每个测试都从干净的 todolist 开始
        self.context.todolist = []
        self.tool = TodoListAddTool()

    def _run_add_tool(self, **params):
        """
        封装工具调用,便于后续统一修改调用方式。

        根据实际实现,可能需要改成:
        - self.tool(self.context, **params)
        - self.tool.run(self.context, **params)
        - self.tool.invoke(params, self.context)
        等。
        """
        return self.tool.run(self.context, **params)

    def test_assign_ids_when_list_empty(self):
        """当 todolist 为空时,新增任务应从 ID=1 开始递增"""
        result = self._run_add_tool(
            tasks=[
                {"description": "task 1", "status": "todo"},
                {"description": "task 2", "status": "doing"},
            ]
        )

        # 校验上下文内的 todolist
        assert len(self.context.todolist) == 2
        assert self.context.todolist[0]["id"] == 1
        assert self.context.todolist[0]["description"] == "task 1"
        assert self.context.todolist[0]["status"] == "todo"

        assert self.context.todolist[1]["id"] == 2
        assert self.context.todolist[1]["description"] == "task 2"
        assert self.context.todolist[1]["status"] == "doing"

        # 如果工具有返回值,也一并校验
        assert result is None or isinstance(result, dict)

    def test_assign_ids_when_list_non_empty(self):
        """当 todolist 非空时,应在现有最大 ID 基础上递增"""
        # 预置已有任务
        self.context.todolist = [
            {"id": 1, "description": "existing 1", "status": "todo"},
            {"id": 3, "description": "existing 3", "status": "done"},
        ]

        self._run_add_tool(
            tasks=[
                {"description": "new 1", "status": "todo"},
                {"description": "new 2", "status": "todo"},
            ]
        )

        # 最大 ID 为 3,新任务应为 4,5
        assert len(self.context.todolist) == 4
        new_tasks = self.context.todolist[-2:]
        assert new_tasks[0]["id"] == 4
        assert new_tasks[0]["description"] == "new 1"
        assert new_tasks[0]["status"] == "todo"

        assert new_tasks[1]["id"] == 5
        assert new_tasks[1]["description"] == "new 2"
        assert new_tasks[1]["status"] == "todo"

    def test_add_tool_error_when_tasks_missing(self):
        """缺少 tasks 参数时应走错误分支"""
        with pytest.raises((ValueError, KeyError, TypeError)):
            # 不传 tasks
            self._run_add_tool()

    def test_add_tool_error_when_tasks_empty(self):
        """tasks 为空列表时应走错误分支"""
        with pytest.raises(ValueError):
            self._run_add_tool(tasks=[])


class TestTodoListUpdateTool:
    """TodoListUpdateTool 行为测试"""

    def setup_method(self):
        self.context = MockAstrAgentContext()
        self.context.todolist = [
            {"id": 1, "description": "task 1", "status": "todo"},
            {"id": 2, "description": "task 2", "status": "doing"},
        ]
        self.tool = TodoListUpdateTool()

    def _run_update_tool(self, **params):
        """
        封装工具调用,便于后续统一修改调用方式。

        根据实际实现,可能需要改成:
        - self.tool(self.context, **params)
        - self.tool.run(self.context, **params)
        - self.tool.invoke(params, self.context)
        等。
        """
        return self.tool.run(self.context, **params)

    def test_update_status_and_description(self):
        """可以同时更新任务状态和描述"""
        result = self._run_update_tool(
            task_id=1,
            status="done",
            description="task 1 updated",
        )

        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["status"] == "done"
        assert task["description"] == "task 1 updated"
        assert result is None or isinstance(result, dict)

    def test_update_only_status_keeps_description(self):
        """仅更新状态时应保留原 description 不变"""
        original_desc = self.context.todolist[1]["description"]

        self._run_update_tool(
            task_id=2,
            status="done",
        )

        task = next(t for t in self.context.todolist if t["id"] == 2)
        assert task["status"] == "done"
        # 未传 description,应保持原值
        assert task["description"] == original_desc

    def test_update_only_description_keeps_status(self):
        """仅更新描述时应保留原 status 不变"""
        original_status = self.context.todolist[0]["status"]

        self._run_update_tool(
            task_id=1,
            description="only description updated",
        )

        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["description"] == "only description updated"
        assert task["status"] == original_status

    def test_update_non_existent_task_id_raises(self):
        """更新不存在的 task_id 时应走错误分支"""
        with pytest.raises((ValueError, LookupError, KeyError)):
            self._run_update_tool(
                task_id=999,
                status="done",
            )

    @pytest.mark.parametrize("status", ["todo", "doing", "done"])
    def test_accepts_valid_status_values(self, status):
        """验证常见状态值可以被接受并写入"""
        self._run_update_tool(
            task_id=1,
            status=status,
        )
        task = next(t for t in self.context.todolist if t["id"] == 1)
        assert task["status"] == status
  1. from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool 的导入路径改成项目实际路径(例如 astrbot.core.agent.tools.todolist 等)。
  2. 根据工具的真实接口,调整 _run_add_tool_run_update_tool 中对工具的调用方式(目前假定为 tool.run(context, **params))。如果工具是可调用对象或有 invoke/__call__ 等,请相应替换。
  3. 如果 MockAstrAgentContext 的定义不在 tests.agent.runners.test_todolist_injection 中,请将导入改为实际位置,或改用正式的 AstrAgentContext 构造测试用例。
  4. 如工具在错误路径下不是通过抛异常,而是返回错误对象/状态码,请将 pytest.raises(...) 断言替换成对返回值的断言,以匹配实际实现。

Comment on lines +193 to +202
def test_truncate_by_count_exceeds_limit(self):
"""测试按数量截断 - 超过限制"""
messages = [
{"role": "system", "content": "System"},
{"role": "user", "content": "Message 1"},
{"role": "assistant", "content": "Response 1"},
{"role": "user", "content": "Message 2"},
{"role": "assistant", "content": "Response 2"},
{"role": "user", "content": "Message 3"},
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (testing):truncate_by_count 补充 max_messages 小于或等于系统消息数量时的测试用例。

当前测试没有覆盖 max_messages 小于或等于 system 消息数量这一情况,在这种情况下 max_messages - len(system_msgs) 可能为 0 或负数,切片逻辑很容易写错。请添加一个测试:构造多个 system 消息,并分别将 max_messages 设为 0、1 以及恰好等于系统消息数量,以验证该行为。

Original comment in English

issue (testing): Add a test for truncate_by_count when max_messages is less than or equal to the number of system messages.

Current tests don’t cover the case where max_messages is less than or equal to the number of system messages, where max_messages - len(system_msgs) can be zero or negative and slicing is easy to get wrong. Please add a test that creates multiple system messages and sets max_messages to 0, 1, and exactly the number of system messages to validate this behavior.

Comment on lines +303 to +312
from astrbot.core.context_manager.context_compressor import LLMSummaryCompressor

compressor = LLMSummaryCompressor(provider, keep_recent=3)

messages = [
{"role": "system", "content": "System prompt"},
{"role": "user", "content": "Q1"},
{"role": "assistant", "content": "A1"},
{"role": "user", "content": "Q2"},
{"role": "assistant", "content": "A2"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick (testing): LLM 摘要相关测试与 summary_prompt.md 中的具体文案绑定得过于紧密,建议放宽断言条件。

目前有若干测试直接断言 instruction 和 summary 中的具体中文前缀(例如 "请基于我们完整的对话记录""历史会话摘要"),这会导致任何对 summary_prompt.md 文案的修改都容易破坏测试。更稳健的做法是继续断言结构行为(例如额外的用户指令消息和系统摘要消息存在),但放宽对内容的检查(例如只要求非空、有调用并插入 summary 工具的结果),或者把对精确字符串的检查集中封装在一个辅助函数中,这样调整 prompt 文案时,不必修改大量测试用例。

Original comment in English

nitpick (testing): LLM summary tests are tightly coupled to the exact wording in summary_prompt.md; consider loosening assertions.

Several tests assert specific Chinese prefixes in the instruction and summary (e.g. "请基于我们完整的对话记录", "历史会话摘要"), which makes them fragile to any wording change in summary_prompt.md. It’d be more robust to keep asserting the structural behavior (extra user instruction and system summary messages) while loosening the content checks (e.g., non‑empty, summary tool invoked and interpolated), or centralizing any exact-string checks behind a single helper so prompt edits don’t require touching many tests.

# ============ ContextManager 模拟实现 ============


class MockContextManager:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议在这个演示脚本中,用对真实生产组件的轻量封装来替代重新实现的上下文管理、todo 列表和最大步数逻辑,以避免重复业务规则。

主要的复杂度问题在于,该脚本重新实现了核心逻辑(上下文管理、token 阈值、压缩、todo 列表注入、最大步数注入),而不是复用已有的、已经过测试的组件。这会制造一套平行的心智模型,并且后续很容易产生偏差。

你可以在保留全部演示行为的前提下,通过以下方式简化:

  1. 使用真实的 ContextManager 替代 MockContextManager,当前文件只做简单封装
    保留本文件中的打印/日志逻辑,但在 token 计数和压缩决策上复用真正的实现。

    # from astrbot.core.context_manager import ContextManager  # adjust import path
    
    class DemoContextManager:
        """Thin wrapper that only adds logging around the real ContextManager."""
    
        def __init__(self, model_context_limit: int, is_agent_mode: bool = False, provider=None):
            self.cm = ContextManager(
                model_context_limit=model_context_limit,
                is_agent_mode=is_agent_mode,
                provider=provider,
            )
    
        async def process_context(self, messages: list[dict]) -> list[dict]:
            total_tokens = self.cm.count_tokens(messages)
            usage_rate = total_tokens / self.cm.model_context_limit
            print(f"  初始Token数: {total_tokens}")
            print(f"  上下文限制: {self.cm.model_context_limit}")
            print(f"  使用率: {usage_rate:.2%}")
            print(f"  触发阈值: {self.cm.threshold:.0%}")
    
            # delegate real logic
            result = await self.cm.process_context(messages)
    
            print("\n【输出】压缩后的消息历史:")
            print_messages(result, indent=1)
            return result

    然后在 demo_context_manager() 中:

    async def demo_context_manager():
        print_separator("DEMO 1: ContextManager Workflow")
    
        print_subsection("Agent模式(触发摘要压缩)")
        print("【输入】完整消息历史:")
        print_messages(LONG_MESSAGE_HISTORY, indent=1)
    
        print("\n【处理】执行 ContextManager.process_context (AGENT 模式):")
        mock_provider = MagicMock()
        cm_agent = DemoContextManager(
            model_context_limit=150, is_agent_mode=True, provider=mock_provider
        )
        result_agent = await cm_agent.process_context(LONG_MESSAGE_HISTORY)

    这样既保留了相同的打印说明,又去掉了重复的 token 计数、阈值和压缩策略逻辑。

  2. 复用真实的 todo 列表注入/格式化逻辑
    不要再维护一套 format_todolist + inject_todolist_to_messages 的实现,而是调用核心 helper,只在外面包一层用于打印的封装。

    # from astrbot.core.todo import inject_todolist, format_todolist  # adjust to real modules
    
    def demo_inject_todolist_to_messages(
        messages: list[dict],
        todolist: list[dict],
        max_tool_calls: int | None = None,
        current_tool_calls: int | None = None,
    ) -> list[dict]:
        """Thin demo wrapper adding logs around real injection."""
        print("\n【输入】TodoList:")
        for task in todolist:
            status_icon = {"pending": "[ ]", "in_progress": "[-]", "completed": "[x]"}.get(
                task["status"], "[ ]"
            )
            print(f"  {status_icon} #{task['id']}: {task['description']}")
    
        result = inject_todolist(
            messages=messages,
            todolist=todolist,
            max_tool_calls=max_tool_calls,
            current_tool_calls=current_tool_calls,
        )
    
        print("\n【输出】注入后的消息历史:")
        print_messages(result, indent=1)
        return result

    然后在 demo_todolist_injection() 中,把对 inject_todolist_to_messages 的调用替换为 demo_inject_todolist_to_messages,从而展示真实注入行为。

  3. 集中处理最大步数的智能注入逻辑
    demo_max_step_smart_injection 中的逻辑本质上就是手写的 _smart_inject_user_message/“最大步数注入”行为。如果生产代码中已经存在类似逻辑,建议直接封装调用,而不是在此重复规则。

    # from astrbot.core.agent_runner import smart_inject_user_message  # adjust import
    
    def demo_max_step_injection(messages: list[dict], max_step_message: str) -> list[dict]:
        """Use real smart-injection and only add console explanation."""
        print("\n【处理】模拟工具耗尽,执行智能注入逻辑...")
        result = smart_inject_user_message(messages, max_step_message)
        print("\n【输出】智能注入后的消息历史:")
        print_messages(result, indent=1)
        return result

    然后在 demo_max_step_smart_injection() 中:

    result_a = demo_max_step_injection(messages_with_user, max_step_message)
    ...
    result_b = demo_max_step_injection(messages_with_tool, max_step_message)
  4. 仅在本文件中保留演示相关的逻辑
    各种打印 helper(print_separatorprint_subsectionprint_messages)以及示例数据都可以保留在这里;关键的简化点是:不要在这里重新表达业务规则(token 统计、压缩、注入),而是用增加日志的方式去包装真实组件。

这些改动可以保持所有演示行为(以及可见输出)不变,同时消除平行实现,使脚本更易维护,并在核心逻辑未来变更时自动保持一致。

Original comment in English

issue (complexity): Consider replacing the reimplemented context, todo-list, and max-step logic in this demo with thin wrappers around the real production components to avoid duplicating business rules.

The main complexity issue is that this script re‑implements core logic (context management, token thresholds, compression, todo‑list injection, max‑step injection) instead of calling existing, tested components. That creates a parallel mental model and will drift over time.

You can keep all demo behavior but simplify by:

  1. Delegating to the real ContextManager instead of MockContextManager
    Keep the printing/logging in this file, but reuse the actual implementation for token counting and compression decisions.

    # from astrbot.core.context_manager import ContextManager  # adjust import path
    
    class DemoContextManager:
        """Thin wrapper that only adds logging around the real ContextManager."""
    
        def __init__(self, model_context_limit: int, is_agent_mode: bool = False, provider=None):
            self.cm = ContextManager(
                model_context_limit=model_context_limit,
                is_agent_mode=is_agent_mode,
                provider=provider,
            )
    
        async def process_context(self, messages: list[dict]) -> list[dict]:
            total_tokens = self.cm.count_tokens(messages)
            usage_rate = total_tokens / self.cm.model_context_limit
            print(f"  初始Token数: {total_tokens}")
            print(f"  上下文限制: {self.cm.model_context_limit}")
            print(f"  使用率: {usage_rate:.2%}")
            print(f"  触发阈值: {self.cm.threshold:.0%}")
    
            # delegate real logic
            result = await self.cm.process_context(messages)
    
            print("\n【输出】压缩后的消息历史:")
            print_messages(result, indent=1)
            return result

    Then in demo_context_manager():

    async def demo_context_manager():
        print_separator("DEMO 1: ContextManager Workflow")
    
        print_subsection("Agent模式(触发摘要压缩)")
        print("【输入】完整消息历史:")
        print_messages(LONG_MESSAGE_HISTORY, indent=1)
    
        print("\n【处理】执行 ContextManager.process_context (AGENT 模式):")
        mock_provider = MagicMock()
        cm_agent = DemoContextManager(
            model_context_limit=150, is_agent_mode=True, provider=mock_provider
        )
        result_agent = await cm_agent.process_context(LONG_MESSAGE_HISTORY)

    This removes duplicated token counting, thresholds and compression strategy while keeping the same printed explanation.

  2. Reusing the real todo‑list injection/formatting logic
    Instead of format_todolist + inject_todolist_to_messages being a second implementation, call the core helpers and only wrap them with pretty printing.

    # from astrbot.core.todo import inject_todolist, format_todolist  # adjust to real modules
    
    def demo_inject_todolist_to_messages(
        messages: list[dict],
        todolist: list[dict],
        max_tool_calls: int | None = None,
        current_tool_calls: int | None = None,
    ) -> list[dict]:
        """Thin demo wrapper adding logs around real injection."""
        print("\n【输入】TodoList:")
        for task in todolist:
            status_icon = {"pending": "[ ]", "in_progress": "[-]", "completed": "[x]"}.get(
                task["status"], "[ ]"
            )
            print(f"  {status_icon} #{task['id']}: {task['description']}")
    
        result = inject_todolist(
            messages=messages,
            todolist=todolist,
            max_tool_calls=max_tool_calls,
            current_tool_calls=current_tool_calls,
        )
    
        print("\n【输出】注入后的消息历史:")
        print_messages(result, indent=1)
        return result

    Then in demo_todolist_injection() replace calls to inject_todolist_to_messages with demo_inject_todolist_to_messages, so the real injection behavior is showcased.

  3. Centralizing max‑step smart injection
    The logic in demo_max_step_smart_injection is essentially a hand‑rolled _smart_inject_user_message / “max step injection”. If that exists in production code, wrap it instead of inlining the rules twice.

    # from astrbot.core.agent_runner import smart_inject_user_message  # adjust import
    
    def demo_max_step_injection(messages: list[dict], max_step_message: str) -> list[dict]:
        """Use real smart-injection and only add console explanation."""
        print("\n【处理】模拟工具耗尽,执行智能注入逻辑...")
        result = smart_inject_user_message(messages, max_step_message)
        print("\n【输出】智能注入后的消息历史:")
        print_messages(result, indent=1)
        return result

    Then in demo_max_step_smart_injection():

    result_a = demo_max_step_injection(messages_with_user, max_step_message)
    ...
    result_b = demo_max_step_injection(messages_with_tool, max_step_message)
  4. Keep demo‑specific concerns only in this file
    All the printing helpers (print_separator, print_subsection, print_messages) and example data are fine here; the key simplification is to stop re‑expressing business rules (token accounting, compression, injection) and instead wrap the real components with extra logging.

These changes maintain all showcased behaviors (and visible output) but collapse the parallel implementation, making the script cheaper to maintain and automatically consistent with future core logic changes.

@kawayiYokami kawayiYokami marked this pull request as draft December 24, 2025 10:09
@kawayiYokami
Copy link
Contributor Author

@sourcery-ai[bot] 感谢审查!已针对以下问题进行修复:

修复 1:TodoList 工具单元测试 ✅

  • 新增 tests/agent/tools/test_todolist_tool.py,包含 13 个测试用例
  • 覆盖 TodoListAddTool 和 TodoListUpdateTool 的核心逻辑
  • 修复了 TodoListUpdateTool 的参数验证问题(缺失 status 时返回错误信息而非抛异常)

修复 2:测试断言放松 ✅

  • 修改 tests/core/test_context_manager.py 和 tests/core/test_context_manager_integration.py
  • 将精确中文文本检查改为验证内容非空,更健壮

修复 3:showcase_features.py 重复实现 ✅

  • 删除了重复的 MockContextManager 类
  • 复用核心组件逻辑(TokenCounter, ContextManager, ContextCompressor)
  • 保留 format_todolist 和 inject_todolist_to_messages 函数(与 tool_loop_agent_runner.py 保持一致)

修复 4:ContextManager 内部标志污染 ✅

  • 移除消息对象上的 _needs_compression 和 _initial_token_count 标志
  • 改为方法内部返回值传递,保持消息对象干净

关于保留的问题(已解释):

  • Issues 1 & 3(truncate_by_count 边界情况):Agent 上下文中系统消息不会出现多次,判定为过度设计
  • showcase_features.py 保持独立组件复制:避免运行时依赖问题,用于独立演示

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant