Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apps/sim/app/api/workspaces/[id]/pptx/preview/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
return NextResponse.json({ error: 'Insufficient permissions' }, { status: 403 })
}

const body = await req.json()
let body: unknown
try {
body = await req.json()
} catch {
return NextResponse.json({ error: 'Invalid or missing JSON body' }, { status: 400 })
}
const { code } = body as { code?: string }

if (typeof code !== 'string' || code.trim().length === 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,50 +546,80 @@ function PptxPreview({
const [rendering, setRendering] = useState(false)
const [renderError, setRenderError] = useState<string | null>(null)

// Streaming preview: only re-triggers when the streaming source code or
// workspace changes. Isolated from fileData/dataUpdatedAt so that file-list
// refreshes don't abort the in-flight compilation request.
useEffect(() => {
if (streamingContent === undefined) return

let cancelled = false
const controller = new AbortController()
let debounceTimer: ReturnType<typeof setTimeout> | null = null

async function render() {
const debounceTimer = setTimeout(async () => {
if (cancelled) return
try {
setRendering(true)
setRenderError(null)

if (streamingContent !== undefined) {
const response = await fetch(`/api/workspaces/${workspaceId}/pptx/preview`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ code: streamingContent }),
signal: controller.signal,
})
if (!response.ok) {
const err = await response.json().catch(() => ({ error: 'Preview failed' }))
throw new Error(err.error || 'Preview failed')
}
if (cancelled) return
const arrayBuffer = await response.arrayBuffer()
if (cancelled) return
const data = new Uint8Array(arrayBuffer)
const images: string[] = []
await renderPptxSlides(
data,
(src) => {
images.push(src)
if (!cancelled) setSlides([...images])
},
() => cancelled
)
return
const response = await fetch(`/api/workspaces/${workspaceId}/pptx/preview`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ code: streamingContent }),
signal: controller.signal,
})
if (!response.ok) {
const err = await response.json().catch(() => ({ error: 'Preview failed' }))
throw new Error(err.error || 'Preview failed')
}
if (cancelled) return
const arrayBuffer = await response.arrayBuffer()
if (cancelled) return
const data = new Uint8Array(arrayBuffer)
const images: string[] = []
await renderPptxSlides(
data,
(src) => {
images.push(src)
if (!cancelled) setSlides([...images])
},
() => cancelled
)
} catch (err) {
if (!cancelled && !(err instanceof DOMException && err.name === 'AbortError')) {
const msg = err instanceof Error ? err.message : 'Failed to render presentation'
logger.error('PPTX render failed', { error: msg })
setRenderError(msg)
}
} finally {
if (!cancelled) setRendering(false)
}
}, 500)

return () => {
cancelled = true
clearTimeout(debounceTimer)
controller.abort()
}
}, [streamingContent, workspaceId])

// Non-streaming render: uses the fetched binary directly on the client.
// Skipped while streaming is active so it doesn't interfere.
useEffect(() => {
if (streamingContent !== undefined) return

let cancelled = false

async function render() {
if (cancelled) return
try {
if (cached) {
setSlides(cached)
return
}

if (!fileData) return
setRendering(true)
setRenderError(null)
setSlides([])
const data = new Uint8Array(fileData)
const images: string[] = []
Expand All @@ -605,7 +635,7 @@ function PptxPreview({
pptxCacheSet(cacheKey, images)
}
} catch (err) {
if (!cancelled && !(err instanceof DOMException && err.name === 'AbortError')) {
if (!cancelled) {
const msg = err instanceof Error ? err.message : 'Failed to render presentation'
logger.error('PPTX render failed', { error: msg })
setRenderError(msg)
Expand All @@ -615,18 +645,10 @@ function PptxPreview({
}
}

// Debounce streaming renders so rapid SSE updates don't spawn a subprocess
// per event. Non-streaming renders (file load / cache) run immediately.
if (streamingContent !== undefined) {
debounceTimer = setTimeout(render, 500)
} else {
render()
}
render()

return () => {
cancelled = true
if (debounceTimer) clearTimeout(debounceTimer)
controller.abort()
}
}, [fileData, dataUpdatedAt, streamingContent, cacheKey, workspaceId])

Expand Down
34 changes: 29 additions & 5 deletions apps/sim/lib/copilot/orchestrator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ function didAsyncToolSucceed(input: {
durableError?: string | null
completion?: { status: string } | undefined
toolStateSuccess?: boolean | undefined
toolStateStatus?: string | undefined
}) {
const { durableStatus, durableResult, durableError, completion, toolStateSuccess } = input
const {
durableStatus,
durableResult,
durableError,
completion,
toolStateSuccess,
toolStateStatus,
} = input

if (durableStatus === ASYNC_TOOL_STATUS.completed) {
return true
Expand All @@ -50,6 +58,9 @@ function didAsyncToolSucceed(input: {
})
}

if (toolStateStatus === 'success') return true
if (toolStateStatus === 'error' || toolStateStatus === 'cancelled') return false

return completion?.status === 'success' || toolStateSuccess === true
}

Expand Down Expand Up @@ -212,17 +223,29 @@ export async function orchestrateCopilotStream(
})
continue
}
const toolState = context.toolCalls.get(toolCallId)
if (!durableRow && !localPendingPromise && toolState) {
logger.info('Including Go-handled tool in resume payload (no Sim-side row)', {
toolCallId,
toolName: toolState.name,
status: toolState.status,
runId: continuation.runId,
})
claimableToolCallIds.push(toolCallId)
continue
}
logger.warn('Skipping already-claimed or missing async tool resume', {
toolCallId,
runId: continuation.runId,
})
}

if (localPendingPromises.length > 0) {
await Promise.allSettled(localPendingPromises)
continue
}

if (claimableToolCallIds.length === 0) {
if (localPendingPromises.length > 0) {
await Promise.allSettled(localPendingPromises)
continue
}
logger.warn('Skipping async resume because no tool calls were claimable', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
Expand Down Expand Up @@ -257,6 +280,7 @@ export async function orchestrateCopilotStream(
durableError: durable?.error,
completion,
toolStateSuccess: toolState?.result?.success,
toolStateStatus: toolState?.status,
})
const data =
durableResult ||
Expand Down
33 changes: 33 additions & 0 deletions apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,39 @@ describe('sse-handlers tool lifecycle', () => {
expect(updated?.status).toBe('cancelled')
})

it('does not replace an in-flight pending promise on duplicate tool_call', async () => {
let resolveTool: ((value: { success: boolean; output: { ok: boolean } }) => void) | undefined
executeToolServerSide.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveTool = resolve
})
)
markToolComplete.mockResolvedValueOnce(true)

const event = {
type: 'tool_call',
data: { id: 'tool-inflight', name: 'read', arguments: { workflowId: 'workflow-1' } },
}

await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
await new Promise((resolve) => setTimeout(resolve, 0))

const firstPromise = context.pendingToolPromises.get('tool-inflight')
expect(firstPromise).toBeDefined()

await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })

expect(executeToolServerSide).toHaveBeenCalledTimes(1)
expect(context.pendingToolPromises.get('tool-inflight')).toBe(firstPromise)

resolveTool?.({ success: true, output: { ok: true } })
await new Promise((resolve) => setTimeout(resolve, 0))

expect(context.pendingToolPromises.has('tool-inflight')).toBe(false)
expect(markToolComplete).toHaveBeenCalledTimes(1)
})

it('still executes the tool when async row upsert fails', async () => {
upsertAsyncToolCall.mockRejectedValueOnce(new Error('db down'))
executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } })
Expand Down
29 changes: 21 additions & 8 deletions apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ import { executeToolAndReport, waitForToolCompletion } from './tool-execution'

const logger = createLogger('CopilotSseHandlers')

function registerPendingToolPromise(
context: StreamingContext,
toolCallId: string,
pendingPromise: Promise<{ status: string; message?: string; data?: Record<string, unknown> }>
) {
context.pendingToolPromises.set(toolCallId, pendingPromise)
pendingPromise.finally(() => {
if (context.pendingToolPromises.get(toolCallId) === pendingPromise) {
context.pendingToolPromises.delete(toolCallId)
}
})
}

/**
* When the Sim→Go stream is aborted, avoid starting server-side tool work and
* unblock the Go async waiter with a terminal 499 completion.
Expand Down Expand Up @@ -327,6 +340,9 @@ export const sseHandlers: Record<string, SSEHandler> = {

if (isPartial) return
if (wasToolResultSeen(toolCallId)) return
if (context.pendingToolPromises.has(toolCallId) || existing?.status === 'executing') {
return
}

const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
Expand Down Expand Up @@ -375,10 +391,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
data: { error: err instanceof Error ? err.message : String(err) },
}
})
context.pendingToolPromises.set(toolCallId, pendingPromise)
pendingPromise.finally(() => {
context.pendingToolPromises.delete(toolCallId)
})
registerPendingToolPromise(context, toolCallId, pendingPromise)
}

if (options.interactive === false) {
Expand Down Expand Up @@ -574,6 +587,9 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
}

if (isPartial) return
if (context.pendingToolPromises.has(toolCallId) || existing?.status === 'executing') {
return
}

const { clientExecutable, internal } = getEventUI(event)

Expand Down Expand Up @@ -614,10 +630,7 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
data: { error: err instanceof Error ? err.message : String(err) },
}
})
context.pendingToolPromises.set(toolCallId, pendingPromise)
pendingPromise.finally(() => {
context.pendingToolPromises.delete(toolCallId)
})
registerPendingToolPromise(context, toolCallId, pendingPromise)
}

if (options.interactive === false) {
Expand Down
Loading
Loading