Skip to content

fix(mothership): abort streamlining#3734

Merged
Sg312 merged 9 commits intostagingfrom
fix/mothership-abort
Mar 24, 2026
Merged

fix(mothership): abort streamlining#3734
Sg312 merged 9 commits intostagingfrom
fix/mothership-abort

Conversation

@Sg312
Copy link
Collaborator

@Sg312 Sg312 commented Mar 24, 2026

Summary

Streamline aborts.

Type of Change

  • Bug fix

Testing

Tested manually

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@vercel
Copy link

vercel bot commented Mar 24, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped Mar 24, 2026 9:14am

Request Review

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 24, 2026

Greptile Summary

This PR hardens the copilot chat infrastructure with a cluster of concurrency, resilience, and state-management fixes. The main changes are: (1) replacing passive waitForPendingChatStream with an acquirePendingChatStream that returns HTTP 409 when a chat already has an in-flight stream, (2) inverting event ordering in createSSEStream to persist-before-deliver, (3) protecting upsertAsyncToolCall against status downgrades, (4) adding exponential-backoff retries for async resume claims in the orchestrator, and (5) propagating the Go-backend abort notification from the abort endpoint.

  • Concurrent request guard (acquirePendingChatStream): new acquire-or-reject pattern correctly avoids double-registration via pendingChatStreamAlreadyRegistered, and both route handlers skip saving the assistant message when orchestration fails (if (!result.success) return).
  • Persist-before-deliver (chat-streaming.ts): events are now written to the replay buffer before being sent to the client. Any persistence failure now throws and terminates the live stream — a resilience trade-off that may surface as user-visible errors under transient storage failures.
  • Inline error tag (use-chat.ts): the UNEXPECTED_PROVIDER_ERROR_TAG is injected into the message stream on every error SSE event, but its hardcoded content says "An unexpected provider error occurred" regardless of what the actual error is (rate limits, auth failures, etc.), producing misleading inline text.
  • Stream cleanup (use-chat.ts): streamReaderRef.current?.cancel() was removed from the unmount cleanup, leaving background fetch connections open until the server closes them.
  • Terminal-state guard (repository.ts): clean fix prevents upsertAsyncToolCall from downgrading a completed/failed/delivered row back to pending.
  • Orchestrator retry (orchestrator/index.ts): up to 3 retries with 250 ms × attempt backoff when async tool calls cannot be claimed, addressing race windows between tool completion and DB propagation.

Confidence Score: 3/5

  • Mostly sound fixes, but two logic issues need attention before merge: a misleading hardcoded error tag injected for all error types, and a resilience regression where any transient persistence failure now kills a live stream.
  • The concurrent-request guard, terminal-state protection, orchestrator retry logic, and Go-backend abort notification are all well-implemented. However, the persist-before-deliver throw on failure is a meaningful resilience change that isn't clearly intentional, and the UNEXPECTED_PROVIDER_ERROR_TAG always emitting a static misleading message for every error type is a clear logic bug that will produce wrong UI copy for users.
  • apps/sim/lib/copilot/chat-streaming.ts (persist-first error handling) and apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts (inline error tag logic and unmount cleanup)

Important Files Changed

Filename Overview
apps/sim/lib/copilot/chat-streaming.ts Adds acquirePendingChatStream (non-force-abort acquire with 409 on timeout), and inverts event ordering to persist-before-deliver. The persist-first approach throws on any write failure, which is a resilience regression for live streams under transient storage errors.
apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts Multiple stream-handling fixes: resets streamingFile on chat clear, changes stale-check from break/cancel to continue, injects inline error tags on errors. The inline error tag is always the static "unexpected provider error" string regardless of actual error type, and the stream reader is no longer cancelled on component unmount.
apps/sim/app/api/copilot/chat/route.ts Replaces passive wait with acquirePendingChatStream returning 409 on contention; skips saving assistant message on unsuccessful runs; correctly threads pendingChatStreamAlreadyRegistered to avoid double-registration.
apps/sim/app/api/mothership/chat/route.ts Same concurrent-request guard as copilot chat route; replaces waitForPendingChatStream with acquirePendingChatStream and adds if (!result.success) return guard.
apps/sim/lib/copilot/async-runs/repository.ts Adds terminal-state protection to upsertAsyncToolCall: prevents downgrading a completed/failed/delivered status back to pending. Clean defensive fix with informative logging.
apps/sim/lib/copilot/orchestrator/index.ts Adds up to 3 exponential backoff retries (250ms × attempt) when no async tool calls are claimable but pending IDs exist. Handles race windows where tool completions haven't yet propagated to the claim table.
apps/sim/app/api/copilot/chat/abort/route.ts Adds best-effort Go-backend abort notification before local stream abort. Correctly wrapped in try/catch so failures don't block local abort.

Sequence Diagram

sequenceDiagram
    participant Client
    participant RouteHandler as Route Handler
    participant PendingMap as pendingChatStreams (in-memory)
    participant createSSEStream
    participant Buffer as Event Buffer (Redis)
    participant GoBackend as Go Backend

    Client->>RouteHandler: POST /api/copilot/chat
    RouteHandler->>PendingMap: acquirePendingChatStream(chatId, streamId)
    alt stream already in-flight
        PendingMap-->>RouteHandler: false (timeout)
        RouteHandler-->>Client: 409 A response is already in progress
    else no conflict
        PendingMap-->>RouteHandler: true (registered)
        RouteHandler->>createSSEStream: pendingChatStreamAlreadyRegistered=true
        createSSEStream->>GoBackend: orchestrateCopilotStream (SSE)
        loop each SSE event
            GoBackend-->>createSSEStream: event
            createSSEStream->>Buffer: eventWriter.write(event) [persist FIRST]
            Buffer-->>createSSEStream: ok / throws
            createSSEStream-->>Client: SSE data enqueue
        end
        createSSEStream->>PendingMap: resolvePendingChatStream(chatId, streamId)
        createSSEStream-->>Client: stream done
    end

    Client->>RouteHandler: POST /api/copilot/chat/abort
    RouteHandler->>GoBackend: POST /api/streams/explicit-abort (best-effort)
    RouteHandler->>createSSEStream: abortActiveStream(streamId)
Loading

Comments Outside Diff (1)

  1. apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts, line 1509-1516 (link)

    P2 Stream reader not cancelled on unmount — potential resource leak

    streamReaderRef.current?.cancel() was removed from the cleanup useEffect. When the component unmounts while a stream is active, the underlying HTTP body reader will keep pulling data from the network until the server closes the connection, because no AbortController.abort() or reader.cancel() is called.

    The abortControllerRef.current = null assignment only clears the ref; it does not abort the controller. Consider calling .abort() before nulling the ref:

    return () => {
      streamReaderRef.current = null
      abortControllerRef.current?.abort()
      abortControllerRef.current = null
      streamGenRef.current++
      sendingRef.current = false
    }

Reviews (1): Last reviewed commit: "Fixes" | Re-trigger Greptile

@icecrasher321
Copy link
Collaborator

bugbot run

@cursor
Copy link

cursor bot commented Mar 24, 2026

PR Summary

Medium Risk
Changes core chat streaming concurrency and persistence behavior (409 on concurrent sends, new pending-stream acquisition) and adds cross-service abort signaling, which could affect user-facing chat reliability if edge cases/timeouts are mishandled.

Overview
Prevents multiple simultaneous responses per chat by introducing acquirePendingChatStream gating in both Copilot and Mothership chat POST routes, returning 409 when a prior stream is still active instead of waiting/force-aborting.

Improves streaming robustness: createSSEStream now persists SSE events before enqueuing to the client (and fails loudly on persistence errors), avoids double-registering pending streams, and only persists assistant messages on onComplete when the orchestrator result is successful.

Updates stop/reconnect flows: abort now also best-effort notifies the Go service via /api/streams/explicit-abort (optionally with x-api-key), the UI reconnect path surfaces a clearer tail-reconnect error and finalizes with an error state, and async tool upserts/resume logic gains protections against terminal-state downgrades plus limited retries when no tool calls are claimable.

Written by Cursor Bugbot for commit e17576c. Configure here.

@icecrasher321 icecrasher321 changed the title Fixes fix(mothership): abort streamlining Mar 24, 2026
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

@Sg312 Sg312 merged commit 092525e into staging Mar 24, 2026
11 checks passed
@waleedlatif1 waleedlatif1 deleted the fix/mothership-abort branch March 24, 2026 17:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants