Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions apps/sim/app/api/copilot/chat/abort/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { NextResponse } from 'next/server'
import { abortActiveStream } from '@/lib/copilot/chat-streaming'
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/chat-streaming'
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
import { env } from '@/lib/core/config/env'

const GO_EXPLICIT_ABORT_TIMEOUT_MS = 3000

export async function POST(request: Request) {
const { userId: authenticatedUserId, isAuthenticated } =
Expand All @@ -12,11 +17,48 @@ export async function POST(request: Request) {

const body = await request.json().catch(() => ({}))
const streamId = typeof body.streamId === 'string' ? body.streamId : ''
let chatId = typeof body.chatId === 'string' ? body.chatId : ''

if (!streamId) {
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
}

const aborted = abortActiveStream(streamId)
if (!chatId) {
const run = await getLatestRunForStream(streamId, authenticatedUserId).catch(() => null)
if (run?.chatId) {
chatId = run.chatId
}
}

try {
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
if (env.COPILOT_API_KEY) {
headers['x-api-key'] = env.COPILOT_API_KEY
}
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), GO_EXPLICIT_ABORT_TIMEOUT_MS)
const response = await fetch(`${SIM_AGENT_API_URL}/api/streams/explicit-abort`, {
method: 'POST',
headers,
signal: controller.signal,
body: JSON.stringify({
messageId: streamId,
userId: authenticatedUserId,
...(chatId ? { chatId } : {}),
}),
}).finally(() => clearTimeout(timeout))
if (!response.ok) {
throw new Error(`Explicit abort marker request failed: ${response.status}`)
}
} catch {
// best effort: local abort should still proceed even if Go marker fails
}

const aborted = await abortActiveStream(streamId)
if (chatId) {
await waitForPendingChatStream(chatId, GO_EXPLICIT_ABORT_TIMEOUT_MS + 1000, streamId).catch(
() => false
)
}
return NextResponse.json({ aborted })
}
77 changes: 76 additions & 1 deletion apps/sim/app/api/copilot/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import { getSession } from '@/lib/auth'
import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import {
acquirePendingChatStream,
createSSEStream,
releasePendingChatStream,
requestChatTitle,
SSE_RESPONSE_HEADERS,
} from '@/lib/copilot/chat-streaming'
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
import { resolveActiveResourceContext } from '@/lib/copilot/process-contents'
import {
authenticateCopilotRequestSessionOnly,
createBadRequestResponse,
Expand Down Expand Up @@ -44,6 +47,13 @@ const FileAttachmentSchema = z.object({
size: z.number(),
})

const ResourceAttachmentSchema = z.object({
type: z.enum(['workflow', 'table', 'file', 'knowledgebase']),
id: z.string().min(1),
title: z.string().optional(),
active: z.boolean().optional(),
})

const ChatMessageSchema = z.object({
message: z.string().min(1, 'Message is required'),
userMessageId: z.string().optional(),
Expand All @@ -58,6 +68,7 @@ const ChatMessageSchema = z.object({
stream: z.boolean().optional().default(true),
implicitFeedback: z.string().optional(),
fileAttachments: z.array(FileAttachmentSchema).optional(),
resourceAttachments: z.array(ResourceAttachmentSchema).optional(),
provider: z.string().optional(),
contexts: z
.array(
Expand Down Expand Up @@ -98,6 +109,10 @@ const ChatMessageSchema = z.object({
*/
export async function POST(req: NextRequest) {
const tracker = createRequestTracker()
let actualChatId: string | undefined
let pendingChatStreamAcquired = false
let pendingChatStreamHandedOff = false
let pendingChatStreamID: string | undefined

try {
// Get session to access user information including name
Expand All @@ -124,6 +139,7 @@ export async function POST(req: NextRequest) {
stream,
implicitFeedback,
fileAttachments,
resourceAttachments,
provider,
contexts,
commands,
Expand Down Expand Up @@ -189,7 +205,7 @@ export async function POST(req: NextRequest) {

let currentChat: any = null
let conversationHistory: any[] = []
let actualChatId = chatId
actualChatId = chatId
const selectedModel = model || 'claude-opus-4-6'

if (chatId || createNewChat) {
Expand Down Expand Up @@ -241,6 +257,39 @@ export async function POST(req: NextRequest) {
}
}

if (
Array.isArray(resourceAttachments) &&
resourceAttachments.length > 0 &&
resolvedWorkspaceId
) {
const results = await Promise.allSettled(
resourceAttachments.map(async (r) => {
const ctx = await resolveActiveResourceContext(
r.type,
r.id,
resolvedWorkspaceId!,
authenticatedUserId,
actualChatId
)
if (!ctx) return null
return {
...ctx,
tag: r.active ? '@active_tab' : '@open_tab',
}
})
)
for (const result of results) {
if (result.status === 'fulfilled' && result.value) {
agentContexts.push(result.value)
} else if (result.status === 'rejected') {
logger.error(
`[${tracker.requestId}] Failed to resolve resource attachment`,
result.reason
)
}
}
}

const effectiveMode = mode === 'agent' ? 'build' : mode

const userPermission = resolvedWorkspaceId
Expand Down Expand Up @@ -291,6 +340,21 @@ export async function POST(req: NextRequest) {
})
} catch {}

if (stream && actualChatId) {
const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse)
if (!acquired) {
return NextResponse.json(
{
error:
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
},
{ status: 409 }
)
}
pendingChatStreamAcquired = true
pendingChatStreamID = userMessageIdToUse
}

if (actualChatId) {
const userMsg = {
id: userMessageIdToUse,
Expand Down Expand Up @@ -337,6 +401,7 @@ export async function POST(req: NextRequest) {
titleProvider: provider,
requestId: tracker.requestId,
workspaceId: resolvedWorkspaceId,
pendingChatStreamAlreadyRegistered: Boolean(actualChatId && stream),
orchestrateOptions: {
userId: authenticatedUserId,
workflowId,
Expand All @@ -348,6 +413,7 @@ export async function POST(req: NextRequest) {
interactive: true,
onComplete: async (result: OrchestratorResult) => {
if (!actualChatId) return
if (!result.success) return

const assistantMessage: Record<string, unknown> = {
id: crypto.randomUUID(),
Expand Down Expand Up @@ -423,6 +489,7 @@ export async function POST(req: NextRequest) {
},
},
})
pendingChatStreamHandedOff = true

return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
}
Expand Down Expand Up @@ -528,6 +595,14 @@ export async function POST(req: NextRequest) {
},
})
} catch (error) {
if (
actualChatId &&
pendingChatStreamAcquired &&
!pendingChatStreamHandedOff &&
pendingChatStreamID
) {
await releasePendingChatStream(actualChatId, pendingChatStreamID).catch(() => {})
}
const duration = tracker.getDuration()

if (error instanceof z.ZodError) {
Expand Down
15 changes: 13 additions & 2 deletions apps/sim/app/api/mothership/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import { getSession } from '@/lib/auth'
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import {
acquirePendingChatStream,
createSSEStream,
SSE_RESPONSE_HEADERS,
waitForPendingChatStream,
} from '@/lib/copilot/chat-streaming'
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents'
Expand Down Expand Up @@ -253,7 +253,16 @@ export async function POST(req: NextRequest) {
)

if (actualChatId) {
await waitForPendingChatStream(actualChatId)
const acquired = await acquirePendingChatStream(actualChatId, userMessageId)
if (!acquired) {
return NextResponse.json(
{
error:
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
},
{ status: 409 }
)
}
}

const executionId = crypto.randomUUID()
Expand All @@ -271,6 +280,7 @@ export async function POST(req: NextRequest) {
titleModel: 'claude-opus-4-6',
requestId: tracker.requestId,
workspaceId,
pendingChatStreamAlreadyRegistered: Boolean(actualChatId),
orchestrateOptions: {
userId: authenticatedUserId,
workspaceId,
Expand All @@ -282,6 +292,7 @@ export async function POST(req: NextRequest) {
interactive: true,
onComplete: async (result: OrchestratorResult) => {
if (!actualChatId) return
if (!result.success) return

const assistantMessage: Record<string, unknown> = {
id: crypto.randomUUID(),
Expand Down
Loading
Loading