diff --git a/src/App/src/components/common/PlanTimeoutDialog.tsx b/src/App/src/components/common/PlanTimeoutDialog.tsx new file mode 100644 index 000000000..e6e5ba5e9 --- /dev/null +++ b/src/App/src/components/common/PlanTimeoutDialog.tsx @@ -0,0 +1,59 @@ +import React from 'react'; +import { + Dialog, + DialogSurface, + DialogTitle, + DialogContent, + DialogBody, + DialogActions, + Button, +} from '@fluentui/react-components'; +import { Warning20Regular } from '@fluentui/react-icons'; +import "../../styles/Panel.css"; + +interface PlanTimeoutDialogProps { + isOpen: boolean; + onGoHome: () => void; + onCancel: () => void; +} + +const PlanTimeoutDialog: React.FC = ({ + isOpen, + onGoHome, + onCancel, +}) => { + return ( + + + + +
+ + Session Timed Out +
+
+ + The plan approval request has timed out because no action was taken. + Please go to the Home page and create a new task. + + + + + +
+
+
+ ); +}; + +export default PlanTimeoutDialog; diff --git a/src/App/src/hooks/usePlanWebSocket.tsx b/src/App/src/hooks/usePlanWebSocket.tsx index baabe0a0f..af78ee36d 100644 --- a/src/App/src/hooks/usePlanWebSocket.tsx +++ b/src/App/src/hooks/usePlanWebSocket.tsx @@ -18,6 +18,7 @@ import { approvalRequestReceived, planCompletedFinal, planFailedFinal, + setShowTimeoutDialog, } from '@/store/slices/planSlice'; import { setSubmittingChatDisableInput, @@ -242,6 +243,17 @@ export function usePlanWebSocket({ const c = errorMessage.trim(); if (c.length > 0) errorContent = c; } + + // Detect timeout-specific error → show popup dialog instead of inline error + const isTimeout = errorContent.toLowerCase().includes('timed out'); + if (isTimeout) { + dispatch(planFailedFinal()); + dispatch(setShowBufferingText(false)); + dispatch(setShowTimeoutDialog(true)); + webSocketService.disconnect(); + return; + } + const errorAgent: AgentMessageData = { agent: 'system', agent_type: AgentMessageType.SYSTEM_AGENT, diff --git a/src/App/src/pages/PlanPage.tsx b/src/App/src/pages/PlanPage.tsx index 981323e9a..10eacf193 100644 --- a/src/App/src/pages/PlanPage.tsx +++ b/src/App/src/pages/PlanPage.tsx @@ -28,11 +28,13 @@ import { selectLoadingMessage, selectReloadLeftList, selectWaitingForPlan, + selectShowTimeoutDialog, setReloadLeftList, setProcessingApproval, setShowProcessingPlanSpinner, setShowCancellationDialog, setCancellingPlan, + setShowTimeoutDialog, setLoadingMessage, setErrorLoading, planApprovalAccepted, @@ -73,6 +75,7 @@ import { useInlineToaster } from '../components/toast/InlineToaster'; import Octo from '../commonComponents/imports/Octopus.png'; import LoadingMessage, { loadingMessages } from '../commonComponents/components/LoadingMessage'; import PlanCancellationDialog from '../components/common/PlanCancellationDialog'; +import PlanTimeoutDialog from '../components/common/PlanTimeoutDialog'; import '../styles/PlanPage.css'; // Singleton API service @@ -99,6 +102,7 @@ const PlanPage: React.FC = () => { const showProcessingPlanSpinner = useAppSelector(selectShowProcessingPlanSpinner); const showCancellationDialog = useAppSelector(selectShowCancellationDialog); const cancellingPlan = useAppSelector(selectCancellingPlan); + const showTimeoutDialog = useAppSelector(selectShowTimeoutDialog); const loadingMessage = useAppSelector(selectLoadingMessage); const reloadLeftList = useAppSelector(selectReloadLeftList); const waitingForPlan = useAppSelector(selectWaitingForPlan); @@ -388,6 +392,15 @@ const PlanPage: React.FC = () => { onCancel={handleCancelDialog} loading={cancellingPlan} /> + + { + dispatch(setShowTimeoutDialog(false)); + navigate('/'); + }} + onCancel={() => dispatch(setShowTimeoutDialog(false))} + /> ); }; diff --git a/src/App/src/store/slices/planSlice.ts b/src/App/src/store/slices/planSlice.ts index 1214d94a1..321d367b4 100644 --- a/src/App/src/store/slices/planSlice.ts +++ b/src/App/src/store/slices/planSlice.ts @@ -56,6 +56,8 @@ export interface PlanState { showCancellationDialog: boolean; /** Is a cancellation API call in progress? */ cancellingPlan: boolean; + /** Show timeout popup when plan approval times out */ + showTimeoutDialog: boolean; /** Loading message for spinners */ loadingMessage: string; } @@ -74,6 +76,7 @@ const initialState: PlanState = { reloadLeftList: true, showCancellationDialog: false, cancellingPlan: false, + showTimeoutDialog: false, loadingMessage: '', }; @@ -120,6 +123,9 @@ const planSlice = createSlice({ setCancellingPlan(state, action: PayloadAction) { state.cancellingPlan = action.payload; }, + setShowTimeoutDialog(state, action: PayloadAction) { + state.showTimeoutDialog = action.payload; + }, setLoadingMessage(state, action: PayloadAction) { state.loadingMessage = action.payload; }, @@ -231,6 +237,7 @@ export const { setReloadLeftList, setShowCancellationDialog, setCancellingPlan, + setShowTimeoutDialog, setLoadingMessage, markPlanCompleted, planApprovalAccepted, @@ -254,6 +261,7 @@ export const selectContinueWithWebsocketFlow = (s: RootState) => s.plan.continue export const selectReloadLeftList = (s: RootState) => s.plan.reloadLeftList; export const selectShowCancellationDialog = (s: RootState) => s.plan.showCancellationDialog; export const selectCancellingPlan = (s: RootState) => s.plan.cancellingPlan; +export const selectShowTimeoutDialog = (s: RootState) => s.plan.showTimeoutDialog; export const selectLoadingMessage = (s: RootState) => s.plan.loadingMessage; export const selectPlanStatus = (s: RootState) => s.plan.planData?.plan?.overall_status ?? null; export const selectPlanApproved = (s: RootState) => s.plan.planApproved; diff --git a/src/backend/v4/api/router.py b/src/backend/v4/api/router.py index d14602452..77f2021a0 100644 --- a/src/backend/v4/api/router.py +++ b/src/backend/v4/api/router.py @@ -369,6 +369,10 @@ async def process_request( raise HTTPException(status_code=500, detail="Failed to create plan") from e try: + # Cancel any stale pending approvals from previous plans for this user. + # This ensures old background tasks (still waiting for approval) terminate + # silently instead of sending timeout errors to the user's current WebSocket. + orchestration_config.cancel_pending_approvals_for_user(user_id) async def run_orchestration_task(): try: diff --git a/src/backend/v4/config/settings.py b/src/backend/v4/config/settings.py index fa112fcd9..e324dce10 100644 --- a/src/backend/v4/config/settings.py +++ b/src/backend/v4/config/settings.py @@ -97,6 +97,10 @@ def __init__(self): self._approval_events: Dict[str, asyncio.Event] = {} self._clarification_events: Dict[str, asyncio.Event] = {} + # Track which user each pending plan belongs to, and which plans are superseded + self._plan_to_user: Dict[str, str] = {} # plan_id -> user_id + self._superseded_plans: set = set() # plan IDs cancelled by a new task + # Default timeout (seconds) for waiting operations self.default_timeout: float = 300.0 @@ -104,13 +108,15 @@ def get_current_orchestration(self, user_id: str) -> Any: """Get existing orchestration workflow instance for user_id.""" return self.orchestrations.get(user_id, None) - def set_approval_pending(self, plan_id: str) -> None: + def set_approval_pending(self, plan_id: str, user_id: str = None) -> None: """Mark approval pending and create/reset its event.""" self.approvals[plan_id] = None if plan_id not in self._approval_events: self._approval_events[plan_id] = asyncio.Event() else: self._approval_events[plan_id].clear() + if user_id: + self._plan_to_user[plan_id] = user_id def set_approval_result(self, plan_id: str, approved: bool) -> None: """Set approval decision and trigger its event.""" @@ -214,6 +220,30 @@ def cleanup_approval(self, plan_id: str) -> None: """Remove approval tracking data and event.""" self.approvals.pop(plan_id, None) self._approval_events.pop(plan_id, None) + self._plan_to_user.pop(plan_id, None) + self._superseded_plans.discard(plan_id) + + def cancel_pending_approvals_for_user(self, user_id: str) -> None: + """Cancel all pending approvals for a user (called when a new task starts). + + Wakes up any blocking wait_for_approval calls so they return immediately. + The plan is marked as superseded so the orchestration can terminate silently + without sending error messages to the user's current WebSocket. + """ + plans_to_cancel = [ + pid for pid, uid in self._plan_to_user.items() + if uid == user_id and pid in self.approvals and self.approvals[pid] is None + ] + for plan_id in plans_to_cancel: + logger.info("Superseding stale pending approval: %s (user: %s)", plan_id, user_id) + self._superseded_plans.add(plan_id) + self.approvals[plan_id] = False + if plan_id in self._approval_events: + self._approval_events[plan_id].set() # wake up the blocked wait + + def is_plan_superseded(self, plan_id: str) -> bool: + """Check if a plan was superseded by a newer task from the same user.""" + return plan_id in self._superseded_plans def cleanup_clarification(self, request_id: str) -> None: """Remove clarification tracking data and event.""" diff --git a/src/backend/v4/orchestration/exceptions.py b/src/backend/v4/orchestration/exceptions.py new file mode 100644 index 000000000..6bf6bd79d --- /dev/null +++ b/src/backend/v4/orchestration/exceptions.py @@ -0,0 +1,20 @@ +"""Custom exceptions for orchestration module.""" + + +class PlanSupersededError(Exception): + """Raised when a plan's approval wait is cancelled because the user started a new task.""" + + def __init__(self, plan_id: str): + self.plan_id = plan_id + super().__init__(f"Plan {plan_id} was superseded by a new task") + + +class PlanTimeoutError(Exception): + """Raised when user does not approve/reject the plan within the timeout window.""" + + def __init__(self, plan_id: str, timeout_seconds: float = 0): + self.plan_id = plan_id + self.timeout_seconds = timeout_seconds + super().__init__( + f"Plan {plan_id} approval timed out after {timeout_seconds}s" + ) diff --git a/src/backend/v4/orchestration/human_approval_manager.py b/src/backend/v4/orchestration/human_approval_manager.py index d6c5e1ed8..543cbae1b 100644 --- a/src/backend/v4/orchestration/human_approval_manager.py +++ b/src/backend/v4/orchestration/human_approval_manager.py @@ -19,6 +19,7 @@ from v4.config.settings import connection_config, orchestration_config from v4.models.models import MPlan +from .exceptions import PlanSupersededError, PlanTimeoutError from v4.orchestration.helper.plan_to_mplan_converter import PlanToMPlanConverter logger = logging.getLogger(__name__) @@ -330,43 +331,31 @@ async def _wait_for_user_approval( logger.error("No plan ID provided for approval") return messages.PlanApprovalResponse(approved=False, m_plan_id=m_plan_id) - orchestration_config.set_approval_pending(m_plan_id) + orchestration_config.set_approval_pending(m_plan_id, user_id=self.current_user_id) try: approved = await orchestration_config.wait_for_approval(m_plan_id) + + # Check if this plan was superseded by a new task from the same user + if orchestration_config.is_plan_superseded(m_plan_id): + logger.info( + "Plan %s was superseded by a new task - terminating silently", + m_plan_id, + ) + orchestration_config.cleanup_approval(m_plan_id) + raise PlanSupersededError(m_plan_id) + logger.info("Approval received for plan %s: %s", m_plan_id, approved) return messages.PlanApprovalResponse(approved=approved, m_plan_id=m_plan_id) except asyncio.TimeoutError: - logger.debug( - "Approval timeout for plan %s - notifying user and terminating process", + logger.info( + "Approval timeout for plan %s after %ss", m_plan_id, + orchestration_config.default_timeout, ) - - timeout_message = messages.TimeoutNotification( - timeout_type="approval", - request_id=m_plan_id, - message=f"Plan approval request timed out after {orchestration_config.default_timeout} seconds. Please try again.", - timestamp=asyncio.get_event_loop().time(), - timeout_duration=orchestration_config.default_timeout, - ) - - try: - await connection_config.send_status_update_async( - message=timeout_message, - user_id=self.current_user_id, - message_type=messages.WebsocketMessageType.TIMEOUT_NOTIFICATION, - ) - logger.info( - "Timeout notification sent to user %s for plan %s", - self.current_user_id, - m_plan_id, - ) - except Exception as e: - logger.error("Failed to send timeout notification: %s", e) - orchestration_config.cleanup_approval(m_plan_id) - return None + raise PlanTimeoutError(m_plan_id, orchestration_config.default_timeout) except KeyError as e: logger.debug("Plan ID not found: %s - terminating process silently", e) @@ -377,6 +366,10 @@ async def _wait_for_user_approval( orchestration_config.cleanup_approval(m_plan_id) return None + except (PlanSupersededError, PlanTimeoutError): + # Let these propagate to orchestration_manager for proper handling + raise + except Exception as e: logger.debug( "Unexpected error waiting for approval: %s - terminating process silently", diff --git a/src/backend/v4/orchestration/orchestration_manager.py b/src/backend/v4/orchestration/orchestration_manager.py index 27f394645..bfdb0b80a 100644 --- a/src/backend/v4/orchestration/orchestration_manager.py +++ b/src/backend/v4/orchestration/orchestration_manager.py @@ -37,6 +37,7 @@ from v4.config.settings import connection_config, orchestration_config from v4.models.messages import WebsocketMessageType from v4.orchestration.human_approval_manager import HumanApprovalMagenticManager +from .exceptions import PlanSupersededError, PlanTimeoutError from v4.magentic_agents.magentic_agent_factory import MagenticAgentFactory from common.database.database_factory import DatabaseFactory from v4.models.models import PlanStatus @@ -541,6 +542,67 @@ async def run_orchestration(self, user_id: str, input_task, plan_id: str = None) ) self.logger.info("Final result sent via WebSocket to user '%s'", user_id) + except PlanSupersededError: + # Plan was superseded by a new task from the same user. + # Terminate silently — do NOT send error messages to the WebSocket + # because the user has already moved on to a new plan. + self.logger.info( + "Plan '%s' was superseded by a new task — terminating silently", + self._plan_id, + ) + # Update plan status to failed in the database (housekeeping) + try: + if self._plan_id: + memory_store = await DatabaseFactory.get_database(user_id=user_id) + plan = await memory_store.get_plan_by_plan_id(plan_id=self._plan_id) + if plan: + plan.overall_status = PlanStatus.FAILED + await memory_store.update_plan(plan) + self.logger.info("Superseded plan '%s' status updated to FAILED", self._plan_id) + except Exception as db_error: + self.logger.error("Failed to update superseded plan status: %s", db_error) + return # Exit silently without sending error to user + + except PlanTimeoutError as e: + # Plan approval timed out on the current active session. + # Send a timeout-specific user-friendly error via ERROR_MESSAGE. + self.logger.info( + "Plan '%s' approval timed out after %ss", + self._plan_id, + e.timeout_seconds, + ) + # Update plan status to failed + try: + if self._plan_id: + memory_store = await DatabaseFactory.get_database(user_id=user_id) + plan = await memory_store.get_plan_by_plan_id(plan_id=self._plan_id) + if plan: + plan.overall_status = PlanStatus.FAILED + await memory_store.update_plan(plan) + self.logger.info("Timed-out plan '%s' status updated to FAILED", self._plan_id) + except Exception as db_error: + self.logger.error("Failed to update timed-out plan status: %s", db_error) + # Send timeout error to user + try: + await connection_config.send_status_update_async( + { + "type": WebsocketMessageType.ERROR_MESSAGE, + "data": { + "content": ( + "The plan approval request timed out because no action was taken. " + "Please start a new task and try again." + ), + "status": "error", + "timestamp": asyncio.get_event_loop().time(), + }, + }, + user_id, + message_type=WebsocketMessageType.ERROR_MESSAGE, + ) + except Exception as send_error: + self.logger.error("Failed to send timeout error: %s", send_error) + return # Don't re-raise; this is a handled terminal state + except Exception as e: # Error handling self.logger.error("Unexpected orchestration error: %s", e, exc_info=True) diff --git a/src/tests/backend/v4/orchestration/test_human_approval_manager.py b/src/tests/backend/v4/orchestration/test_human_approval_manager.py index 454cf98d2..0ed98e1e1 100644 --- a/src/tests/backend/v4/orchestration/test_human_approval_manager.py +++ b/src/tests/backend/v4/orchestration/test_human_approval_manager.py @@ -227,6 +227,7 @@ def convert(plan_text, facts, team, task): # Now import the module under test from backend.v4.orchestration.human_approval_manager import HumanApprovalMagenticManager +from backend.v4.orchestration.exceptions import PlanSupersededError, PlanTimeoutError # Get mocked references for tests connection_config = sys.modules['v4.config.settings'].connection_config @@ -248,6 +249,7 @@ def setUp(self): orchestration_config.wait_for_approval.reset_mock() orchestration_config.wait_for_approval.return_value = True # Default return value orchestration_config.cleanup_approval.reset_mock() + orchestration_config.is_plan_superseded = Mock(return_value=False) # Create mock agent for new API self.mock_agent = Mock() @@ -448,7 +450,7 @@ async def test_wait_for_user_approval_success(self): self.assertTrue(result.approved) self.assertEqual(result.m_plan_id, plan_id) - orchestration_config.set_approval_pending.assert_called_with(plan_id) + orchestration_config.set_approval_pending.assert_called_with(plan_id, user_id=self.user_id) orchestration_config.wait_for_approval.assert_called_with(plan_id) async def test_wait_for_user_approval_rejection(self): @@ -485,32 +487,23 @@ async def test_wait_for_user_approval_timeout(self): plan_id = "test-plan-123" orchestration_config.wait_for_approval.side_effect = asyncio.TimeoutError() - # Execute - result = await self.manager._wait_for_user_approval(plan_id) + # Execute & Verify - should raise PlanTimeoutError + with self.assertRaises(PlanTimeoutError): + await self.manager._wait_for_user_approval(plan_id) - # Verify - self.assertIsNone(result) - - # Verify timeout notification was sent - connection_config.send_status_update_async.assert_called() orchestration_config.cleanup_approval.assert_called_with(plan_id) async def test_wait_for_user_approval_timeout_websocket_error(self): - """Test _wait_for_user_approval with timeout and WebSocket error.""" + """Test _wait_for_user_approval with timeout raises PlanTimeoutError.""" # Setup plan_id = "test-plan-123" orchestration_config.wait_for_approval.side_effect = asyncio.TimeoutError() - connection_config.send_status_update_async.side_effect = Exception("WebSocket error") - # Execute - result = await self.manager._wait_for_user_approval(plan_id) + # Execute & Verify - should raise PlanTimeoutError + with self.assertRaises(PlanTimeoutError): + await self.manager._wait_for_user_approval(plan_id) - # Verify - self.assertIsNone(result) orchestration_config.cleanup_approval.assert_called_with(plan_id) - - # Reset side effect - connection_config.send_status_update_async.side_effect = None async def test_wait_for_user_approval_key_error(self): """Test _wait_for_user_approval with KeyError."""