Skip to content

Conversation

@Micro66
Copy link
Collaborator

@Micro66 Micro66 commented Dec 3, 2025

Summary

  • Add new Chat Shell type supporting direct LLM API calls (Claude, OpenAI compatible)
  • Implement async streaming chat services for Chat and Dify shell types
  • Enable direct execution in Backend without requiring Docker Executor containers
  • Add session management for multi-turn conversations via Redis

Changes

New Files

  • backend/app/services/chat/ - Direct chat service module
    • base.py - Base classes, HTTP client manager, shared utilities
    • chat_service.py - Chat type async streaming implementation
    • dify_service.py - Dify type async streaming implementation
    • session_manager.py - Redis-based session state management
  • backend/app/api/endpoints/adapter/chat.py - SSE streaming API endpoints

Modified Files

  • backend/app/api/api.py - Register new chat routes
  • backend/app/core/config.py - Add MAX_CONCURRENT_CHATS config
  • backend/app/main.py - Add HTTP client lifecycle management
  • backend/app/services/adapters/task_kinds.py - Add is_direct_chat_team() helper
  • backend/init_data/02-public-shells.yaml - Add Chat Shell definition

Technical Details

  • Full async/await implementation for optimal uvicorn worker utilization
  • Shared httpx.AsyncClient with connection pooling
  • Session state stored in Redis for multi-worker compatibility
  • Support for conversation history (Chat) and conversation_id (Dify)
  • Proper error handling with subtask status updates

Test Plan

  • Verify Chat Shell appears in public shells list
  • Create a Bot using Chat Shell with Claude/OpenAI model
  • Test direct chat streaming via /api/chat/stream endpoint
  • Verify multi-turn conversation maintains context
  • Test cancellation via /api/chat/cancel endpoint
  • Verify Dify Shell works with direct chat mode
  • Confirm mixed teams (Chat + ClaudeCode) fall back to Executor

Summary by CodeRabbit

Release Notes

  • New Features

    • Added direct chat streaming API with support for Claude, OpenAI, and Dify language models
    • Chat history persistence and session management
    • Ability to cancel ongoing chat operations
  • Configuration

    • New setting for maximum concurrent chat sessions (default: 50)
    • New public Chat shell interface with support for Claude and OpenAI models

✏️ Tip: You can customize this high-level summary in your review settings.

This commit introduces direct chat capabilities that allow Chat and Dify
shell types to execute directly in Backend without requiring Docker
Executor containers, improving response speed for lightweight chat
scenarios.

Key changes:
- Add new Chat Shell type supporting Claude and OpenAI compatible APIs
- Implement async streaming chat services using httpx.AsyncClient
- Add session management for multi-turn conversations via Redis
- Create /api/chat/stream SSE endpoint for real-time responses
- Add helper function to determine if a team supports direct chat mode
- Configure shared HTTP client lifecycle in app startup/shutdown

Technical highlights:
- Full async/await implementation for optimal uvicorn worker utilization
- Session state stored in Redis for multi-worker compatibility
- Support for conversation history (Chat) and conversation_id (Dify)
- Proper error handling with subtask status updates
@coderabbitai
Copy link

coderabbitai bot commented Dec 3, 2025

Walkthrough

A new Direct Chat streaming API enables lightweight chat workflows with support for Claude, OpenAI, and Dify backends. Introduces chat service infrastructure including HTTP client management, session management, streaming utilities, endpoint handlers, and task validation. Registers the chat router and adds public Shell configuration for direct chat mode.

Changes

Cohort / File(s) Summary
API Routing & Endpoints
backend/app/api/api.py, backend/app/api/endpoints/adapter/chat.py
Registers new /chat router with direct-chat tag; adds POST /stream and POST /cancel endpoints for chat operations with request validation, bot/shell configuration resolution, streaming responses, and asynchronous status updates.
Core Chat Services
backend/app/services/chat/__init__.py, backend/app/services/chat/base.py, backend/app/services/chat/chat_service.py, backend/app/services/chat/dify_service.py, backend/app/services/chat/session_manager.py
Introduces base chat service abstractions, HTTP client manager, and three implementations: ChatService for Claude/OpenAI direct API calls with streaming, DifyService for Dify API integration with conversation tracking, and SessionManager for Redis-backed chat history and cancellation state management.
Application Configuration & Lifecycle
backend/app/core/config.py, backend/app/main.py, backend/app/services/adapters/task_kinds.py
Adds MAX_CONCURRENT_CHATS setting, extends shutdown flow to close HTTP client manager, and introduces is_direct_chat_team validation method for checking bot shell compatibility.
Data Initialization
backend/init_data/02-public-shells.yaml
Adds new public Chat shell resource with supportModel entries for claude and openai.

Sequence Diagram

sequenceDiagram
    actor Client
    participant API as /chat/stream Endpoint
    participant DB as Database
    participant ChatSvc as Chat/Dify Service
    participant HTTPClient as HTTP Client Manager
    participant SessionMgr as Session Manager
    participant Backend as LLM Backend<br/>(Claude/OpenAI/Dify)

    Client->>API: POST /stream (task_id, subtask_id, prompt)
    API->>DB: Validate subtask ownership & status
    API->>DB: Resolve bot, shell config (with public shell fallback)
    API->>DB: Verify shell type supports direct-chat
    API->>DB: Build chat config (system prompt, model, API keys)
    API->>DB: Mark subtask RUNNING, update timestamps
    
    API->>ChatSvc: chat_stream(task_id, subtask_id, prompt, config)
    
    loop Streaming Response
        ChatSvc->>HTTPClient: get_client()
        ChatSvc->>Backend: Stream chat request (via Claude/OpenAI/Dify API)
        Backend-->>ChatSvc: SSE chunks (message events)
        ChatSvc->>SessionMgr: Append message to chat history
        ChatSvc-->>API: Yield SSE formatted event
        API-->>Client: SSE event (stream)
        
        Note over SessionMgr: Track cancellation signal
    end
    
    alt Completion
        API->>SessionMgr: Check accumulated response
        API->>DB: Update subtask COMPLETED with result
    else Error/Cancellation
        ChatSvc->>Backend: Cancel in-flight request
        API->>DB: Update subtask FAILED with error
    end
    
    API-->>Client: Close stream
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Streaming implementation & error handling: Review async streaming patterns, SSE formatting, and error recovery paths across multiple service implementations.
  • Async/await & concurrency: Verify proper handling of cancellation signals, session state, and potential race conditions in session manager and HTTP client manager.
  • API integrations & security: Examine API key handling (including encrypted keys), Dify API communication, Claude/OpenAI payload construction, and config building logic.
  • Code duplication: The is_direct_chat_team method appears twice identically in task_kinds.py—consolidate or verify intentionality.
  • Database transaction consistency: Review subtask status updates and the separate threadpool-based async update routine for potential race conditions or orphaned updates.

Possibly related PRs

  • Weagent/Support dify #198: Implements Dify agent functionality and modifies chat/Dify service surface and API routing; shares similar architecture for Dify integration.

Suggested reviewers

  • qdaxb
  • feifei325

Poem

🐰 A swift rabbit hops through the chat streams,
With Claude and Dify fulfilling dreams,
Sessions stored safe in Redis's keep,
Streaming responses, no promise to reap,
Direct chats now swift—hippity-hop! ✨

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely describes the main change: adding direct chat support for two shell types (Chat and Dify), which is the primary feature across all modified and new files.
Docstring Coverage ✅ Passed Docstring coverage is 90.24% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch weagent/feat-direct-chat-shell

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (13)
backend/app/services/chat/__init__.py (1)

21-28: Consider sorting __all__ for consistency.

Per RUF022, the __all__ list is not sorted. While the current logical grouping is reasonable, sorting alphabetically is the idiomatic Python convention.

 __all__ = [
-    "ChatShellTypes",
-    "DirectChatService",
-    "http_client_manager",
     "ChatService",
+    "ChatShellTypes",
+    "DirectChatService",
     "DifyService",
+    "http_client_manager",
     "session_manager",
 ]
backend/app/services/chat/dify_service.py (4)

76-82: Silent failure on JSON parse error may hide configuration issues.

When bot_prompt JSON parsing fails, the error is silently ignored. This could hide configuration problems from users.

             try:
                 prompt_data = json.loads(bot_prompt)
                 if prompt_data.get("params"):
                     params.update(prompt_data.get("params", {}))
             except json.JSONDecodeError:
-                pass
+                logger.warning(f"Failed to parse bot_prompt as JSON for task {task_id}")

118-129: Use logging.exception for network errors to capture stack traces.

Per static analysis hints, logging.error should be logging.exception for errors that have exception context, as it automatically includes the traceback.

         except asyncio.CancelledError:
             logger.info(f"Dify stream cancelled for task {task_id}")
             yield self._format_error_event("Request cancelled")
         except httpx.TimeoutException:
-            logger.error(f"Dify API timeout for task {task_id}")
+            logger.exception(f"Dify API timeout for task {task_id}")
             yield self._format_error_event("API call timeout")
         except httpx.RequestError as e:
-            logger.error(f"Network error for task {task_id}: {e}")
+            logger.exception(f"Network error for task {task_id}")
             yield self._format_error_event(f"Network error: {str(e)}")
         except Exception as e:
-            logger.exception(f"Error in Dify stream for task {task_id}: {e}")
+            logger.exception(f"Error in Dify stream for task {task_id}")
             yield self._format_error_event(str(e))

203-206: Consider defining custom exception classes for API errors.

The static analysis flags raising generic Exception with long messages. For better error handling and testability, consider creating domain-specific exceptions.

For example, create in base.py or a dedicated exceptions module:

class DifyAPIError(Exception):
    """Raised when Dify API returns an error."""
    def __init__(self, status_code: int, message: str):
        self.status_code = status_code
        super().__init__(f"Dify API error {status_code}: {message}")

Also applies to: 285-288


151-159: Broad exception catch may mask unexpected errors.

Catching all Exception types when fetching app mode could hide serious issues. Consider catching more specific exceptions or at least logging the exception type.

         try:
             client = await http_client_manager.get_client()
             response = await client.get(url, headers=headers, timeout=10.0)
             response.raise_for_status()
             data = response.json()
             return data.get("mode", "chat")
-        except Exception as e:
+        except (httpx.HTTPError, json.JSONDecodeError, KeyError) as e:
             logger.warning(f"Failed to get Dify app mode, defaulting to 'chat': {e}")
             return "chat"
backend/app/services/chat/chat_service.py (3)

152-156: Hardcoded Anthropic API version may become outdated.

The anthropic-version header is hardcoded to 2023-06-01. Consider making this configurable or extracting to a constant for easier updates.

+    # Anthropic API version - update when new versions are released
+    ANTHROPIC_API_VERSION = "2023-06-01"
+
     async def _call_claude_api(
         ...
         headers = {
             "x-api-key": api_key,
-            "anthropic-version": "2023-06-01",
+            "anthropic-version": self.ANTHROPIC_API_VERSION,
             "content-type": "application/json",
         }

158-163: Extract max_tokens as a configurable parameter.

The max_tokens value of 8192 is hardcoded. Different models have different token limits, and users may want to configure this. Per coding guidelines, magic numbers should be extracted to named constants.

+    # Default max tokens for Claude API
+    DEFAULT_MAX_TOKENS = 8192
+
     async def _call_claude_api(
         ...
+        max_tokens = config.get("max_tokens", self.DEFAULT_MAX_TOKENS)
+
         payload = {
             "model": model_id,
-            "max_tokens": 8192,
+            "max_tokens": max_tokens,
             "stream": True,
             "messages": messages,
         }

Note: This requires passing config to _call_claude_api or extracting max_tokens in chat_stream.


113-124: Apply consistent logging improvements as in DifyService.

Same static analysis hints apply - use logging.exception for exception cases.

         except asyncio.CancelledError:
             logger.info(f"Chat stream cancelled for task {task_id}")
             yield self._format_error_event("Request cancelled")
         except httpx.TimeoutException:
-            logger.error(f"API call timeout for task {task_id}")
+            logger.exception(f"API call timeout for task {task_id}")
             yield self._format_error_event("API call timeout")
         except httpx.RequestError as e:
-            logger.error(f"Network error for task {task_id}: {e}")
+            logger.exception(f"Network error for task {task_id}")
             yield self._format_error_event(f"Network error: {str(e)}")
         except Exception as e:
-            logger.exception(f"Error in chat stream for task {task_id}: {e}")
+            logger.exception(f"Error in chat stream for task {task_id}")
             yield self._format_error_event(str(e))
backend/app/services/chat/base.py (2)

137-139: Move json import to top of file.

The json module is imported inside _format_sse_event. While this works, it's inconsistent with Python conventions and the coding guidelines. Imports should be at the module level.

 import asyncio
+import json
 import logging
 from abc import ABC, abstractmethod

And update the method:

     def _format_sse_event(
         self,
         event_type: str,
         data: Dict[str, Any]
     ) -> str:
-        import json
         data["event"] = event_type
         return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"

17-17: Remove unused import List.

The List type hint is imported but not used in this file.

-from typing import Any, AsyncGenerator, Dict, List, Optional
+from typing import Any, AsyncGenerator, Dict, Optional
backend/app/api/endpoints/adapter/chat.py (3)

392-394: ThreadPoolExecutor created per request is inefficient.

Creating a new ThreadPoolExecutor for each request adds overhead. Consider using a module-level executor or FastAPI's background tasks.

+# Module-level executor for async DB operations
+_db_executor = ThreadPoolExecutor(max_workers=4)
+

 async def _update_subtask_status_async(
     subtask_id: int,
     result: str,
     error: bool,
 ) -> None:
     ...
     loop = asyncio.get_event_loop()
-    executor = ThreadPoolExecutor(max_workers=1)
-    await loop.run_in_executor(executor, update_status)
+    await loop.run_in_executor(_db_executor, update_status)

Alternatively, use asyncio.to_thread() (Python 3.9+) which manages a thread pool internally:

await asyncio.to_thread(update_status)

386-388: Use logging.exception for better error tracing.

Per static analysis, use logging.exception to capture the full traceback.

         except Exception as e:
-            logger.error(f"Failed to update subtask status: {e}")
+            logger.exception(f"Failed to update subtask status")
             db.rollback()

158-199: Consider simplifying response content extraction.

The current approach parses SSE chunks using string matching and JSON parsing. Since you control the service layer, consider having the service yield both the raw SSE chunk and the content separately, or use a more robust parsing approach.

The current implementation works but is fragile if the SSE format changes. Consider:

# In the service layer, yield a tuple or dataclass:
# yield SSEChunk(raw=sse_string, content=content_text, event_type="message")

# Then in generate():
async for chunk in service.chat_stream(...):
    yield chunk.raw
    if chunk.event_type == "message":
        full_response += chunk.content
    elif chunk.event_type == "error":
        error_occurred = True
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1aca6e9 and 53ce627.

📒 Files selected for processing (11)
  • backend/app/api/api.py (2 hunks)
  • backend/app/api/endpoints/adapter/chat.py (1 hunks)
  • backend/app/core/config.py (1 hunks)
  • backend/app/main.py (1 hunks)
  • backend/app/services/adapters/task_kinds.py (1 hunks)
  • backend/app/services/chat/__init__.py (1 hunks)
  • backend/app/services/chat/base.py (1 hunks)
  • backend/app/services/chat/chat_service.py (1 hunks)
  • backend/app/services/chat/dify_service.py (1 hunks)
  • backend/app/services/chat/session_manager.py (1 hunks)
  • backend/init_data/02-public-shells.yaml (1 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{py,ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

All code comments, inline comments, block comments, docstrings, TODO/FIXME annotations, and type hints descriptions MUST be written in English

Files:

  • backend/app/services/chat/session_manager.py
  • backend/app/main.py
  • backend/app/api/api.py
  • backend/app/services/chat/base.py
  • backend/app/services/chat/dify_service.py
  • backend/app/api/endpoints/adapter/chat.py
  • backend/app/services/adapters/task_kinds.py
  • backend/app/core/config.py
  • backend/app/services/chat/__init__.py
  • backend/app/services/chat/chat_service.py
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Python code MUST be PEP 8 compliant with Black formatter (line length: 88) and isort for import organization
Python code MUST include type hints for all functions and variables
Python functions SHOULD NOT exceed 50 lines (preferred maximum)
Python functions and classes MUST have descriptive names and public functions/classes MUST include docstrings
Python code MUST extract magic numbers to named constants

Files:

  • backend/app/services/chat/session_manager.py
  • backend/app/main.py
  • backend/app/api/api.py
  • backend/app/services/chat/base.py
  • backend/app/services/chat/dify_service.py
  • backend/app/api/endpoints/adapter/chat.py
  • backend/app/services/adapters/task_kinds.py
  • backend/app/core/config.py
  • backend/app/services/chat/__init__.py
  • backend/app/services/chat/chat_service.py
**/backend/app/services/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

Backend business logic MUST be placed in app/services/ directory

Files:

  • backend/app/services/chat/session_manager.py
  • backend/app/services/chat/base.py
  • backend/app/services/chat/dify_service.py
  • backend/app/services/adapters/task_kinds.py
  • backend/app/services/chat/__init__.py
  • backend/app/services/chat/chat_service.py
**/backend/app/api/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

Backend API route handlers MUST be created in app/api/ directory

Files:

  • backend/app/api/api.py
  • backend/app/api/endpoints/adapter/chat.py
🧬 Code graph analysis (6)
backend/app/main.py (1)
backend/app/services/chat/base.py (1)
  • close (68-73)
backend/app/services/chat/dify_service.py (2)
backend/app/services/chat/base.py (6)
  • chat_stream (88-107)
  • _format_error_event (149-151)
  • _format_message_event (141-143)
  • _format_done_event (145-147)
  • get_client (46-66)
  • cancel (110-120)
backend/app/services/chat/session_manager.py (5)
  • clear_cancelled (191-202)
  • is_cancelled (177-189)
  • get_dify_conversation_id (113-129)
  • save_dify_conversation_id (131-147)
  • set_cancelled (164-175)
backend/app/api/endpoints/adapter/chat.py (11)
backend/app/models/subtask.py (1)
  • Subtask (31-64)
backend/app/schemas/kind.py (1)
  • Ghost (47-54)
backend/app/services/chat/base.py (5)
  • ChatShellTypes (24-32)
  • is_direct_chat_shell (30-32)
  • chat_stream (88-107)
  • _format_error_event (149-151)
  • close (68-73)
backend/app/core/security.py (1)
  • get_current_user (28-50)
backend/app/models/kind.py (1)
  • Kind (25-42)
backend/app/models/public_shell.py (1)
  • PublicShell (19-38)
backend/app/services/chat/chat_service.py (1)
  • chat_stream (37-124)
backend/app/services/chat/dify_service.py (1)
  • chat_stream (41-132)
backend/app/models/public_model.py (1)
  • PublicModel (19-38)
shared/utils/crypto.py (2)
  • decrypt_sensitive_data (85-129)
  • is_data_encrypted (132-152)
backend/app/services/model_aggregation_service.py (1)
  • resolve_model (418-455)
backend/app/services/adapters/task_kinds.py (4)
backend/app/models/kind.py (1)
  • Kind (25-42)
backend/app/models/public_shell.py (1)
  • PublicShell (19-38)
backend/app/services/chat/base.py (2)
  • ChatShellTypes (24-32)
  • is_direct_chat_shell (30-32)
backend/app/schemas/kind.py (3)
  • Team (223-230)
  • Bot (176-183)
  • Shell (129-136)
backend/app/services/chat/__init__.py (3)
backend/app/services/chat/base.py (2)
  • ChatShellTypes (24-32)
  • DirectChatService (80-151)
backend/app/services/chat/chat_service.py (1)
  • ChatService (24-285)
backend/app/services/chat/dify_service.py (1)
  • DifyService (27-383)
backend/app/services/chat/chat_service.py (3)
backend/app/services/chat/base.py (6)
  • chat_stream (88-107)
  • _format_error_event (149-151)
  • _format_message_event (141-143)
  • _format_done_event (145-147)
  • get_client (46-66)
  • cancel (110-120)
backend/app/services/chat/dify_service.py (2)
  • chat_stream (41-132)
  • cancel (372-383)
backend/app/services/chat/session_manager.py (5)
  • clear_cancelled (191-202)
  • get_chat_history (40-56)
  • is_cancelled (177-189)
  • save_chat_history (80-96)
  • set_cancelled (164-175)
🪛 Ruff (0.14.7)
backend/app/main.py

212-212: Store a reference to the return value of loop.create_task

(RUF006)


215-215: Do not catch blind exception: Exception

(BLE001)

backend/app/services/chat/dify_service.py

39-39: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


44-44: Unused method argument: subtask_id

(ARG002)


122-122: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


125-125: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


126-126: Use explicit conversion flag

Replace with conversion flag

(RUF010)


128-128: Redundant exception object included in logging.exception call

(TRY401)


157-157: Do not catch blind exception: Exception

(BLE001)


206-206: Create your own exception

(TRY002)


206-206: Avoid specifying long messages outside the exception class

(TRY003)


235-235: Create your own exception

(TRY002)


235-235: Avoid specifying long messages outside the exception class

(TRY003)


288-288: Create your own exception

(TRY002)


288-288: Avoid specifying long messages outside the exception class

(TRY003)


311-311: Create your own exception

(TRY002)


311-311: Avoid specifying long messages outside the exception class

(TRY003)


368-368: Do not catch blind exception: Exception

(BLE001)

backend/app/api/endpoints/adapter/chat.py

53-53: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


54-54: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


97-97: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)


115-115: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)


181-181: Do not use bare except

(E722)


181-182: try-except-pass detected, consider logging the exception

(S110)


191-191: Redundant exception object included in logging.exception call

(TRY401)


215-215: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


216-216: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


236-236: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)


292-292: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)


321-321: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)


347-347: Do not use bare except

(E722)


347-348: try-except-pass detected, consider logging the exception

(S110)


386-386: Do not catch blind exception: Exception

(BLE001)


387-387: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backend/app/services/adapters/task_kinds.py

1541-1541: Unused method argument: user_id

(ARG002)


1573-1573: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)


1591-1591: Avoid equality comparisons to True; use Kind.is_active: for truth checks

Replace with Kind.is_active

(E712)

backend/app/services/chat/__init__.py

21-28: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

backend/app/services/chat/chat_service.py

40-40: Unused method argument: subtask_id

(ARG002)


117-117: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


120-120: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


121-121: Use explicit conversion flag

Replace with conversion flag

(RUF010)


123-123: Redundant exception object included in logging.exception call

(TRY401)


173-173: Create your own exception

(TRY002)


173-173: Avoid specifying long messages outside the exception class

(TRY003)


196-196: Create your own exception

(TRY002)


196-196: Avoid specifying long messages outside the exception class

(TRY003)


249-249: Create your own exception

(TRY002)


249-249: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: e2e-tests
🔇 Additional comments (21)
backend/app/core/config.py (1)

93-95: LGTM!

The new configuration follows existing patterns with proper type hints and a clear comment. The default of 50 concurrent sessions is reasonable for this feature.

backend/app/api/api.py (1)

9-9: LGTM!

The chat router import and registration follow the established patterns for adapter endpoints. The direct-chat tag clearly identifies the endpoint category.

Also applies to: 34-34

backend/app/services/chat/session_manager.py (2)

40-57: LGTM with minor simplification opportunity.

The chat history retrieval logic is correct. The double list check on lines 54-56 could be simplified since an empty list from cache would pass the isinstance check, but the current implementation is safe and explicit.


206-215: LGTM!

The cleanup method properly clears all session-related data. Good practice to have a single cleanup entry point.

backend/init_data/02-public-shells.yaml (1)

47-62: LGTM!

The Chat Shell definition follows the established pattern. The supportModel list correctly specifies claude and openai as supported backends, and the external_api label aligns with the Dify shell type.

Note: The displayName field in metadata is present here but not in other shells. Consider adding it to other shells for consistency in a follow-up, or removing it here if not required.

backend/app/services/adapters/task_kinds.py (1)

1564-1617: LGTM on the validation logic.

The method correctly validates all team members by:

  1. Finding each bot via the team's botRef
  2. Looking up the shell (user's shells first, then public shells)
  3. Checking shell type against ChatShellTypes.is_direct_chat_shell()
  4. Returning False if any member fails validation

The == True pattern on lines 1573 and 1591 is correct for SQLAlchemy filter conditions despite the static analysis hint (E712).

backend/app/services/chat/dify_service.py (4)

41-132: LGTM overall - well-structured streaming implementation.

The chat_stream method properly handles:

  • Configuration validation with early returns
  • Cancellation flag clearing at start
  • Mode detection for workflow vs chat APIs
  • Cancellation checks during streaming with proper cleanup
  • Comprehensive error handling with SSE error events
  • Resource cleanup in finally block

161-243: LGTM - Chat API implementation is solid.

The _call_chat_api method correctly:

  • Constructs the request with proper headers and payload
  • Handles streaming response iteration
  • Parses SSE data lines correctly
  • Tracks task_id and conversation_id for state management
  • Saves conversation_id for multi-turn support
  • Handles cancellation during iteration

244-318: LGTM - Workflow API implementation handles the different response format correctly.

The workflow API yields output only after workflow_finished event, which is the correct behavior for Dify workflows.


320-370: LGTM - Stop task implementation with proper endpoint selection.

The _stop_dify_task method correctly selects between chat-messages and workflows stop endpoints based on the is_workflow flag.

backend/app/services/chat/chat_service.py (2)

37-111: LGTM - Well-structured chat stream implementation.

The chat_stream method correctly:

  • Validates required configuration (api_key, model_id)
  • Clears previous cancellation flags
  • Loads and appends to message history
  • Routes to appropriate API handler based on model type
  • Checks cancellation during streaming
  • Saves updated history with assistant response on success
  • Has comprehensive error handling

170-199: LGTM - Claude and OpenAI streaming implementations are correct.

Both API implementations properly:

  • Handle streaming response parsing
  • Check for cancellation during iteration
  • Parse SSE data format correctly
  • Handle the [DONE] sentinel
  • Yield content chunks appropriately

Also applies to: 246-272

backend/app/services/chat/base.py (3)

42-66: LGTM - HttpClientManager uses proper double-checked locking.

The implementation correctly uses an async lock with double-checking to ensure thread-safe lazy initialization of the HTTP client. The timeout and connection pool settings are reasonable for a streaming chat service.


80-120: LGTM - DirectChatService ABC provides clean interface.

The abstract base class defines a clear contract for chat services with:

  • chat_stream for streaming chat execution
  • cancel for request cancellation
  • Utility methods for SSE formatting

This promotes consistency across ChatService and DifyService implementations.


24-32: LGTM - ChatShellTypes provides clear shell type checking.

Clean implementation of shell type constants with a class method for validation.

backend/app/api/endpoints/adapter/chat.py (6)

97-97: SQLAlchemy requires == True for boolean column filters.

The static analysis flags Kind.is_active == True as E712, but this is a false positive for SQLAlchemy. Using Kind.is_active alone in a filter would not produce the correct SQL. The current code is correct.

Also applies to: 115-115, 236-236, 292-292, 321-321


53-54: Depends() in argument defaults is the standard FastAPI pattern.

The static analysis flags B008 for Depends() calls in argument defaults, but this is the standard and recommended pattern for FastAPI dependency injection. No change needed.

Also applies to: 215-216


68-86: LGTM - Proper subtask ownership verification and state validation.

The endpoint correctly:

  • Verifies subtask belongs to the current user
  • Checks subtask status allows starting a new chat
  • Returns appropriate HTTP errors for invalid states

108-141: LGTM - Shell resolution with public shell fallback.

The code properly falls back to public shells when user's shell is not found, enabling the new Chat shell type to work for all users.


250-350: LGTM - Configuration building handles multiple model resolution strategies.

The _build_chat_config function properly:

  • Resolves Ghost for system prompt
  • Supports both bind_model and legacy modelRef
  • Handles encrypted API keys with proper decryption
  • Supports Dify-specific parameters

212-247: LGTM - Cancel endpoint with proper ownership verification.

The cancel endpoint correctly verifies task ownership before setting the cancellation flag.

Comment on lines +181 to +182
except:
pass
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid bare except - catch specific exception types.

Bare except clauses can mask unexpected errors. Catch the specific exception type.

                     try:
                         # Parse the SSE data line
                         if chunk.startswith("data: "):
                             data = json.loads(chunk[6:].strip())
                             if data.get("content"):
                                 full_response += data["content"]
-                    except:
-                        pass
+                    except (json.JSONDecodeError, KeyError):
+                        pass  # Non-message chunks or malformed data
🧰 Tools
🪛 Ruff (0.14.7)

181-181: Do not use bare except

(E722)


181-182: try-except-pass detected, consider logging the exception

(S110)

🤖 Prompt for AI Agents
In backend/app/api/endpoints/adapter/chat.py around lines 181 to 182, replace
the bare except: with a targeted exception handler (e.g., except Exception as e
or the specific exception(s) you expect such as KeyError/ValueError) and handle
or log the error instead of silently passing; ensure you either log the
exception (including e and traceback where appropriate) or re-raise/return a
controlled error so unexpected failures are not swallowed.

Comment on lines +347 to +348
except:
pass
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid bare except and log the failure.

Silent failures in config parsing can make debugging difficult.

         if env.get("DIFY_PARAMS"):
             try:
                 params_str = env.get("DIFY_PARAMS", "{}")
                 config["params"] = json.loads(params_str) if isinstance(params_str, str) else params_str
-            except:
-                pass
+            except (json.JSONDecodeError, TypeError) as e:
+                logger.warning(f"Failed to parse DIFY_PARAMS: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except:
pass
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Failed to parse DIFY_PARAMS: {e}")
🧰 Tools
🪛 Ruff (0.14.7)

347-347: Do not use bare except

(E722)


347-348: try-except-pass detected, consider logging the exception

(S110)

🤖 Prompt for AI Agents
In backend/app/api/endpoints/adapter/chat.py around lines 347-348, replace the
bare "except: pass" with a targeted exception handler for parsing errors (e.g.,
ValueError, KeyError, json.JSONDecodeError) that logs the failure; for example,
catch the specific exceptions as e and call the module logger (or existing
logger) with logger.exception(...) or logger.error(..., exc_info=True) including
contextual info about which config failed to parse so the failure is recorded
for debugging.

Comment on lines +397 to +398
# Need json import for config parsing
import json
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Move json import to top of file - it's used before this line.

The json module is imported at line 398 but used at lines 174, 178, and 346. This will cause a NameError at runtime.

 import asyncio
+import json
 import logging
 from datetime import datetime

Remove from bottom:

-# Need json import for config parsing
-import json
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Need json import for config parsing
import json
import asyncio
import json
import logging
from datetime import datetime
🤖 Prompt for AI Agents
In backend/app/api/endpoints/adapter/chat.py around lines 174-398, the json
module is imported late (line 398) but used earlier (lines ~174, ~178, ~346),
causing a NameError; move the line "import json" to the top of the file with the
other standard library imports so json is available before its first use, and
remove the late import at line 398.

Comment on lines +206 to +217
# Close HTTP client for direct chat services
import asyncio
from app.services.chat import http_client_manager
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(http_client_manager.close())
else:
loop.run_until_complete(http_client_manager.close())
except Exception as e:
logger.warning(f"Error closing HTTP client: {e}")
logger.info("Application shutdown completed")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Async cleanup task may not complete before process exits.

When loop.is_running() is True, create_task() schedules the task but doesn't await completion. The process may exit before the HTTP client is properly closed. Additionally, the task reference should be stored per RUF006.

Consider using an async shutdown handler or ensuring the task completes:

     @app.on_event("shutdown")
-    def shutdown():
+    async def shutdown():
         logger.info("Shutting down application...")
         # Stop background jobs
         stop_background_jobs(app)
         # Close HTTP client for direct chat services
-        import asyncio
         from app.services.chat import http_client_manager
         try:
-            loop = asyncio.get_event_loop()
-            if loop.is_running():
-                loop.create_task(http_client_manager.close())
-            else:
-                loop.run_until_complete(http_client_manager.close())
+            await http_client_manager.close()
         except Exception as e:
             logger.warning(f"Error closing HTTP client: {e}")
         logger.info("Application shutdown completed")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Close HTTP client for direct chat services
import asyncio
from app.services.chat import http_client_manager
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(http_client_manager.close())
else:
loop.run_until_complete(http_client_manager.close())
except Exception as e:
logger.warning(f"Error closing HTTP client: {e}")
logger.info("Application shutdown completed")
@app.on_event("shutdown")
async def shutdown():
logger.info("Shutting down application...")
# Stop background jobs
stop_background_jobs(app)
# Close HTTP client for direct chat services
from app.services.chat import http_client_manager
try:
await http_client_manager.close()
except Exception as e:
logger.warning(f"Error closing HTTP client: {e}")
logger.info("Application shutdown completed")
🧰 Tools
🪛 Ruff (0.14.7)

212-212: Store a reference to the return value of loop.create_task

(RUF006)


215-215: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In backend/app/main.py around lines 206 to 217, the shutdown code schedules
http_client_manager.close() with loop.create_task() when the loop is running
which does not guarantee completion before process exit and also doesn't store
the task reference; change this so that when loop.is_running() you submit the
coroutine with asyncio.run_coroutine_threadsafe(http_client_manager.close(),
loop), store the returned Future/task reference (per RUF006), and block on
task.result(timeout=...) to ensure the close completes (or handle TimeoutError),
and when the loop is not running continue to use
loop.run_until_complete(http_client_manager.close()); also keep the task
reference for logging/inspection.

Comment on lines +1537 to +1555
def is_direct_chat_team(
self,
db: Session,
team: Kind,
user_id: int,
) -> bool:
"""
Check if a team supports direct chat mode.
A team supports direct chat if ALL its bots use Chat or Dify shell types.
Args:
db: Database session
team: Team Kind object
user_id: User ID (owner of team)
Returns:
bool: True if team supports direct chat mode
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Search for calls to is_direct_chat_team method
rg "is_direct_chat_team" --type py -B2 -A2

Repository: wecode-ai/Wegent

Length of output: 354


🏁 Script executed:

# Get the full implementation of the method to see if user_id is used anywhere
cd backend/app/services/adapters && cat -n task_kinds.py | sed -n '1537,1620p'

Repository: wecode-ai/Wegent

Length of output: 3297


Remove the unused user_id parameter.

The user_id parameter is declared but never used in the method body. The code uses team.user_id instead (lines 1569, 1587), which is correct for checking the team owner's resources. Remove the parameter and its corresponding docstring entry since no call sites in the codebase pass this argument.

     def is_direct_chat_team(
         self,
         db: Session,
         team: Kind,
-        user_id: int,
     ) -> bool:
         """
         Check if a team supports direct chat mode.

         A team supports direct chat if ALL its bots use Chat or Dify shell types.

         Args:
             db: Database session
             team: Team Kind object
-            user_id: User ID (owner of team)

         Returns:
             bool: True if team supports direct chat mode
         """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def is_direct_chat_team(
self,
db: Session,
team: Kind,
user_id: int,
) -> bool:
"""
Check if a team supports direct chat mode.
A team supports direct chat if ALL its bots use Chat or Dify shell types.
Args:
db: Database session
team: Team Kind object
user_id: User ID (owner of team)
Returns:
bool: True if team supports direct chat mode
"""
def is_direct_chat_team(
self,
db: Session,
team: Kind,
) -> bool:
"""
Check if a team supports direct chat mode.
A team supports direct chat if ALL its bots use Chat or Dify shell types.
Args:
db: Database session
team: Team Kind object
Returns:
bool: True if team supports direct chat mode
"""
🧰 Tools
🪛 Ruff (0.14.7)

1541-1541: Unused method argument: user_id

(ARG002)

🤖 Prompt for AI Agents
In backend/app/services/adapters/task_kinds.py around lines 1537 to 1555, remove
the unused user_id parameter from the is_direct_chat_team method signature and
delete its corresponding docstring entry; update the function signature to only
accept (self, db: Session, team: Kind) and adjust any type hints or imports if
necessary, then search within the method body and surrounding code to ensure all
references use team.user_id (as already done) and that no callers expect the
extra argument (the review notes indicate no call sites pass it).

Comment on lines +288 to +289
# Need to import asyncio for CancelledError
import asyncio
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Move asyncio import to the top of the file.

The asyncio import is placed at the bottom of the file after being used at line 113. This will cause a NameError at runtime when asyncio.CancelledError is raised.

 import json
 import logging
 from typing import Any, AsyncGenerator, Dict, List, Optional
+import asyncio

 import httpx

And remove from the bottom:

-# Need to import asyncio for CancelledError
-import asyncio
-
 # Global service instance
 chat_service = ChatService()
🤖 Prompt for AI Agents
In backend/app/services/chat/chat_service.py around lines 288-289 (asyncio
import) and usage at line 113, the module imports asyncio too late causing a
NameError when asyncio.CancelledError is referenced earlier; move "import
asyncio" into the top-of-file imports section with the other standard-library
imports (above any code that may raise or reference CancelledError, around the
other imports block), and remove the duplicate asyncio import at the bottom so
there's only a single top-level import.

Comment on lines +38 to +39
# Dify task IDs for cancellation (in-memory cache for current requests)
_dify_task_ids: Dict[int, str] = {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Class-level mutable dict is problematic in multi-worker deployments.

_dify_task_ids is a class-level mutable dictionary shared across all instances. In a multi-worker uvicorn deployment, each worker process has its own memory space, so this in-memory cache won't be shared across workers. A cancellation request hitting worker A won't stop a task running on worker B.

Consider using Redis (via session_manager) for storing active Dify task IDs to ensure consistency across workers, or document this as a known limitation.

-    # Dify task IDs for cancellation (in-memory cache for current requests)
-    _dify_task_ids: Dict[int, str] = {}
+    def __init__(self):
+        # Dify task IDs for cancellation (in-memory cache for current requests)
+        # Note: This is per-worker; cross-worker cancellation relies on session_manager
+        self._dify_task_ids: Dict[int, str] = {}

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.14.7)

39-39: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

🤖 Prompt for AI Agents
In backend/app/services/chat/dify_service.py around lines 38-39, the class-level
mutable dict _dify_task_ids is unsafe for multi-worker deployments because it
lives only in each process memory; replace it with a cross-process store (use
the existing session_manager/Redis) or explicitly document the limitation.
Modify the service to persist task IDs in session_manager (e.g., set
session_manager.set_task_id(session_id, task_id) when starting a Dify request
and session_manager.delete_task_id(session_id) when finished) and update cancel
logic to read the task ID from session_manager before issuing cancellation;
ensure proper async calls, error handling and TTL if desired, and add
unit/integration tests or a clear comment in the file if you opt to keep the
in-memory behavior as a documented limitation.

Comment on lines +20 to +21
# Session expiration time (2 hours in seconds)
SESSION_EXPIRE_TIME = 7200
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Extract magic number to named constant.

The cancellation expiry of 300 seconds (5 minutes) on line 175 should be extracted to a named constant alongside SESSION_EXPIRE_TIME for consistency and maintainability. Per coding guidelines, magic numbers should be named constants.

 # Session expiration time (2 hours in seconds)
 SESSION_EXPIRE_TIME = 7200
+# Cancellation flag expiration time (5 minutes in seconds)
+CANCELLATION_EXPIRE_TIME = 300

Then update line 175:

-        return await cache_manager.set(key, True, expire=300)  # 5 minutes
+        return await cache_manager.set(key, True, expire=CANCELLATION_EXPIRE_TIME)

Also applies to: 174-175

🤖 Prompt for AI Agents
In backend/app/services/chat/session_manager.py around lines 20 and 174-175,
extract the magic number 300 used for cancellation expiry into a named constant
(e.g., CANCELLATION_EXPIRE_TIME = 300) placed alongside SESSION_EXPIRE_TIME =
7200, then replace the literal 300 on lines 174-175 with the new constant so the
code uses the named constant consistently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants