Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 70 additions & 36 deletions apps/sim/app/workspace/providers/socket-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import {
useState,
} from 'react'
import { createLogger } from '@sim/logger'
import { getErrorMessage } from '@sim/utils/errors'
import { generateId } from '@sim/utils/id'
import { backoffWithJitter } from '@sim/utils/retry'
import { useParams } from 'next/navigation'
import type { Socket } from 'socket.io-client'
import { getSocketUrl } from '@/lib/core/utils/urls'
Expand All @@ -28,6 +30,9 @@ import { useWorkflowRegistry as useWorkflowRegistryStore } from '@/stores/workfl

const logger = createLogger('SocketContext')

/** Cap on auto-retries after an auth failure before latching for a manual reload, so a genuine logout stops re-minting tokens. */
const MAX_AUTH_RETRY_ATTEMPTS = 10

const TAB_SESSION_ID_KEY = 'sim_tab_session_id'

function getTabSessionId(): string {
Expand Down Expand Up @@ -162,6 +167,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const explicitWorkflowIdRef = useRef<string | null>(explicitWorkflowId)
const joinControllerRef = useRef(new SocketJoinController())
const joinRetryTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const authRetryTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const authRetryAttemptRef = useRef(0)

const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
Expand Down Expand Up @@ -213,6 +220,48 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
}, [])

const clearAuthRetryTimeout = useCallback(() => {
if (authRetryTimeoutRef.current !== null) {
clearTimeout(authRetryTimeoutRef.current)
authRetryTimeoutRef.current = null
}
}, [])

/**
* Recover from a server-denied handshake (token/auth rejection). Socket.IO does
* not auto-reconnect after a namespace middleware rejection — the socket is
* destroyed and `socket.active` is `false` — so we re-run `connect()` with
* backoff, which re-invokes the auth callback to mint a fresh token. This covers
* both a transient mint failure (recovers on the next attempt) and a real 401;
* after {@link MAX_AUTH_RETRY_ATTEMPTS} we latch `authFailed` for a manual reload
* instead of re-minting forever.
*/
const scheduleAuthRetry = useCallback(
(socketInstance: Socket) => {
clearAuthRetryTimeout()
const attempt = authRetryAttemptRef.current

if (attempt >= MAX_AUTH_RETRY_ATTEMPTS) {
setIsReconnecting(false)
setAuthFailed(true)
logger.warn('Socket auth retries exhausted; latching until manual reload', {
attempts: attempt,
})
return
}

setIsReconnecting(true)
const delay = backoffWithJitter(attempt + 1, null, { baseMs: 1000, maxMs: 30000 })
authRetryTimeoutRef.current = setTimeout(() => {
authRetryTimeoutRef.current = null
authRetryAttemptRef.current = attempt + 1
logger.info('Retrying socket connection after denied handshake', { attempt })
socketInstance.connect()
}, delay)
},
[clearAuthRetryTimeout]
)

const resetVisibleWorkflowState = useCallback((workflowId?: string | null) => {
if (workflowId) {
useOperationQueueStore.getState().cancelOperationsForWorkflow(workflowId)
Expand Down Expand Up @@ -326,11 +375,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
useEffect(() => {
if (!user?.id) return

if (authFailed) {
logger.info('Socket initialization skipped - auth failed, waiting for retry')
return
}

if (initializedRef.current || socket || isConnecting) {
logger.info('Socket already exists or is connecting, skipping initialization')
return
Expand Down Expand Up @@ -360,22 +404,23 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
timeout: 10000,
auth: async (cb) => {
try {
const freshToken = await generateSocketToken()
cb({ token: freshToken })
cb({ token: await generateSocketToken() })
} catch (error) {
logger.error('Failed to generate fresh token for connection:', error)
if (error instanceof Error && error.message === 'Authentication required') {
// True auth failure - pass null token, server will reject with "Authentication required"
cb({ token: null })
}
// For server errors, don't call cb - connection will timeout and Socket.IO will retry
logger.warn('Failed to mint socket token; handshake will be denied and retried', {
error: getErrorMessage(error),
})
cb({ token: null })
}
},
})

socketInstance.on('connect', () => {
setIsConnected(true)
setIsConnecting(false)
setIsReconnecting(false)
setAuthFailed(false)
authRetryAttemptRef.current = 0
clearAuthRetryTimeout()
setCurrentSocketId(socketInstance.id ?? null)
logger.info('Socket connected successfully', {
socketId: socketInstance.id,
Expand Down Expand Up @@ -406,24 +451,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
setIsConnecting(false)
logger.error('Socket connection error:', { message: error.message })

// Check if this is an authentication failure
const isAuthError =
error.message?.includes('Token validation failed') ||
error.message?.includes('Authentication failed') ||
error.message?.includes('Authentication required')

if (isAuthError) {
logger.warn(
'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.'
)
socketInstance.disconnect()
setSocket(null)
setAuthFailed(true)
setIsReconnecting(false)
initializedRef.current = false
} else if (socketInstance.active) {
// Temporary failure, will auto-reconnect
if (socketInstance.active) {
setIsReconnecting(true)
} else {
scheduleAuthRetry(socketInstance)
}
})

Expand Down Expand Up @@ -722,6 +753,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {

return () => {
clearJoinRetryTimeout()
clearAuthRetryTimeout()
positionUpdateTimeouts.current.forEach((timeoutId) => {
clearTimeout(timeoutId)
})
Expand All @@ -735,7 +767,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketRef.current = null
}
}
}, [user?.id, authFailed])
}, [user?.id])

const hydrationPhase = useWorkflowRegistryStore((s) => s.hydration.phase)

Expand Down Expand Up @@ -770,19 +802,21 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}, [])

/**
* Retry socket connection after auth failure.
* Call this when user has re-authenticated (e.g., after login redirect).
* Manually retry after auth retries were exhausted and `authFailed` latched.
* Resets the backoff counter and reconnects the existing socket, which re-runs
* the auth callback to mint a fresh token (e.g. after re-authenticating).
*/
const retryConnection = useCallback(() => {
if (!authFailed) {
logger.info('retryConnection called but no auth failure - ignoring')
return
}
logger.info('Retrying socket connection after auth failure')
clearAuthRetryTimeout()
authRetryAttemptRef.current = 0
setAuthFailed(false)
// initializedRef.current was already reset in connect_error handler
// Effect will re-run and attempt connection
}, [authFailed])
socketRef.current?.connect()
}, [authFailed, clearAuthRetryTimeout])

const emitWorkflowOperation = useCallback(
(workflowId: string, operation: string, target: string, payload: any, operationId?: string) => {
Expand Down
Loading