Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
d0a9391
feat: add @trigger.dev/ai package with TriggerChatTransport
cursoragent Feb 15, 2026
8061f0c
test: add comprehensive unit tests for TriggerChatTransport
cursoragent Feb 15, 2026
20f6913
refactor: polish TriggerChatTransport implementation
cursoragent Feb 15, 2026
ec409f6
test: add abort signal, multiple sessions, and body merging tests
cursoragent Feb 15, 2026
70a6240
chore: add changeset for @trigger.dev/ai package
cursoragent Feb 15, 2026
1a9486c
refactor: remove internal ChatSessionState from public exports
cursoragent Feb 15, 2026
27fc52d
feat: support dynamic accessToken function for token refresh
cursoragent Feb 15, 2026
5868bfb
refactor: avoid double-resolving accessToken in sendMessages
cursoragent Feb 15, 2026
66778e8
feat: add chat transport and AI chat helpers to @trigger.dev/sdk
cursoragent Feb 15, 2026
2c26460
test: move chat transport tests to @trigger.dev/sdk
cursoragent Feb 15, 2026
8d1b42f
refactor: delete packages/ai/ — moved to @trigger.dev/sdk subpaths
cursoragent Feb 15, 2026
2039ad6
chore: update changeset to target @trigger.dev/sdk
cursoragent Feb 15, 2026
f5f18f8
fix: address CodeRabbit review feedback
cursoragent Feb 15, 2026
2bc81ed
docs(ai): add AI Chat with useChat guide
cursoragent Feb 15, 2026
6f9d9bb
feat(reference): add ai-chat Next.js reference project
cursoragent Feb 15, 2026
79131a6
fix(reference): use compatible @ai-sdk v3 packages, await convertToMo…
cursoragent Feb 15, 2026
65a3198
Use a single run with iterative waitpoint token completions
ericallam Feb 21, 2026
e607925
Added tool example
ericallam Feb 21, 2026
e8b1eb9
expose a useTriggerChatTransport hook
ericallam Feb 21, 2026
83eb3fd
use input streams and rename chatTask and chatState to chat.task and …
ericallam Mar 3, 2026
cb4506e
add stopping support and fix issue with the OpenAI responses API and …
ericallam Mar 4, 2026
c64ab22
Add warmTimeoutInSeconds option
ericallam Mar 4, 2026
8a88056
Add clientData support
ericallam Mar 4, 2026
2e67e64
provide already converted UIMessages to the run function for better dx
ericallam Mar 4, 2026
6e31226
Added better telemetry support to view turns
ericallam Mar 4, 2026
83f0efa
Fix double looping when resuming from an input stream waitpoint
ericallam Mar 4, 2026
d4d52e1
Add some pending message support in the example
ericallam Mar 4, 2026
63b774b
Accumulate messages in the task, allowing us to only have to send use…
ericallam Mar 5, 2026
24cbb85
build full example with persisting messages, adding necessary hooks, …
ericallam Mar 5, 2026
92303e9
Add ai chat to the sidebar for now
ericallam Mar 5, 2026
0fa311d
remove postinstall hook
ericallam Mar 5, 2026
5732af3
feat: add onTurnStart hook, lastEventId support, and stream resume de…
ericallam Mar 5, 2026
018da8b
Minor fixes around reconnecting streams
ericallam Mar 6, 2026
890a8df
update pnpm link file
ericallam Mar 6, 2026
f61a949
fixed chat tests
ericallam Mar 6, 2026
b2cbd61
use locals for the chat pipe counter instead of a module global
ericallam Mar 6, 2026
fd15694
Add triggerOptions to the transport, auto-tag with the chat ID
ericallam Mar 6, 2026
3dffb4c
Make clientData typesafe and pass to all chat.task hooks
ericallam Mar 6, 2026
b0c57cc
feat: add chat.local for per-run typed data with Proxy access and dir…
ericallam Mar 6, 2026
a8e7432
feat(chat): add stop handling, abort cleanup, continuation support, a…
ericallam Mar 7, 2026
7f47c99
Some improvements to the example ai-chat
ericallam Mar 7, 2026
ac952aa
feat(chat): expose typed chat.stream, add deepResearch subtask exampl…
ericallam Mar 8, 2026
6429667
feat(ai): pass chat context and toolCallId to subtasks, add typed ai.…
ericallam Mar 8, 2026
1c33b6b
feat(chat): add preload support, dynamic tools, and preload-specific …
ericallam Mar 9, 2026
df17d14
docs: add mermaid architecture diagrams for ai-chat system
ericallam Mar 9, 2026
e815fe0
docs: add sequence diagrams to ai-chat guide
ericallam Mar 9, 2026
134aa3f
feat(chat): auto-hydrate chat.local values in ai.tool subtasks
ericallam Mar 9, 2026
469e157
feat(chat): add chat.defer(), preload toggle, TTFB measurement, and f…
ericallam Mar 9, 2026
aabf474
fix(reference): replace hand-rolled HTML stripping with turndown
ericallam Mar 9, 2026
d1b692a
feat(streams): add inputStream.waitWithWarmup(), warm timeout config …
ericallam Mar 9, 2026
29da943
feat(chat): add composable primitives, raw task example, and task mod…
ericallam Mar 10, 2026
10b571c
Introduce the chat session API and better docs organization
ericallam Mar 10, 2026
6878032
Add support for toUIMessageStream() options
ericallam Mar 10, 2026
53b9246
Add metadata to the streamText call
ericallam Mar 12, 2026
80dd700
feat(chat): add chat.prompt API with provider registry support
ericallam Mar 23, 2026
ea41eae
refactor: rename warmTimeout to idleTimeout across chat APIs
ericallam Mar 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .changeset/ai-sdk-chat-transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
"@trigger.dev/sdk": minor
---

Add AI SDK chat transport integration via two new subpath exports:

**`@trigger.dev/sdk/chat`** (frontend, browser-safe):
- `TriggerChatTransport` — custom `ChatTransport` for the AI SDK's `useChat` hook that runs chat completions as durable Trigger.dev tasks
- `createChatTransport()` — factory function

```tsx
import { useChat } from "@ai-sdk/react";
import { TriggerChatTransport } from "@trigger.dev/sdk/chat";

const { messages, sendMessage } = useChat({
transport: new TriggerChatTransport({
task: "my-chat-task",
accessToken,
}),
});
```

**`@trigger.dev/sdk/ai`** (backend, extends existing `ai.tool`/`ai.currentToolOptions`):
- `chatTask()` — pre-typed task wrapper with auto-pipe support
- `pipeChat()` — pipe a `StreamTextResult` or stream to the frontend
- `CHAT_STREAM_KEY` — the default stream key constant
- `ChatTaskPayload` type

```ts
import { chatTask } from "@trigger.dev/sdk/ai";
import { streamText, convertToModelMessages } from "ai";

export const myChatTask = chatTask({
id: "my-chat-task",
run: async ({ messages }) => {
return streamText({
model: openai("gpt-4o"),
messages: convertToModelMessages(messages),
});
},
});
```
22 changes: 22 additions & 0 deletions .claude/rules/package-installation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
paths:
- "**/package.json"
---

# Installing Packages

When adding a new dependency to any package.json in the monorepo:

1. **Look up the latest version** on npm before adding:
```bash
pnpm view <package-name> version
```
If unsure which version to use (e.g. major version compatibility), confirm with the user.

2. **Edit the package.json directly** — do NOT use `pnpm add` as it can cause issues in the monorepo. Add the dependency with the correct version range (typically `^x.y.z`).

3. **Run `pnpm i` from the repo root** after editing to install and update the lockfile:
```bash
pnpm i
```
Always run from the repo root, not from the package directory.
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This file provides guidance to Claude Code when working with this repository. Su

This is a pnpm 10.23.0 monorepo using Turborepo. Run commands from root with `pnpm run`.

**Adding dependencies:** Edit `package.json` directly instead of using `pnpm add`, then run `pnpm i` from the repo root. See `.claude/rules/package-installation.md` for the full process.

```bash
pnpm run docker # Start Docker services (PostgreSQL, Redis, Electric)
pnpm run db:migrate # Run database migrations
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export {
getSchemaParseFn,
type AnySchemaParseFn,
type SchemaParseFn,
type inferSchemaOut,
isSchemaZodEsque,
isSchemaValibotEsque,
isSchemaArkTypeEsque,
Expand Down
12 changes: 12 additions & 0 deletions packages/core/src/v3/inputStreams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ export class InputStreamsAPI implements InputStreamManager {
return this.#getManager().lastSeqNum(streamId);
}

public setLastSeqNum(streamId: string, seqNum: number): void {
this.#getManager().setLastSeqNum(streamId, seqNum);
}

public shiftBuffer(streamId: string): boolean {
return this.#getManager().shiftBuffer(streamId);
}

public disconnectStream(streamId: string): void {
this.#getManager().disconnectStream(streamId);
}

public clearHandlers(): void {
this.#getManager().clearHandlers();
}
Expand Down
29 changes: 29 additions & 0 deletions packages/core/src/v3/inputStreams/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ export class StandardInputStreamManager implements InputStreamManager {
return this.seqNums.get(streamId);
}

setLastSeqNum(streamId: string, seqNum: number): void {
const current = this.seqNums.get(streamId);
// Only advance forward, never backward
if (current === undefined || seqNum > current) {
this.seqNums.set(streamId, seqNum);
}
}

shiftBuffer(streamId: string): boolean {
const buffered = this.buffer.get(streamId);
if (buffered && buffered.length > 0) {
buffered.shift();
if (buffered.length === 0) {
this.buffer.delete(streamId);
}
return true;
}
return false;
}

setRunId(runId: string, streamsVersion?: string): void {
this.currentRunId = runId;
this.streamsVersion = streamsVersion;
Expand Down Expand Up @@ -158,6 +178,15 @@ export class StandardInputStreamManager implements InputStreamManager {
}
}

disconnectStream(streamId: string): void {
const tail = this.tails.get(streamId);
if (tail) {
tail.abortController.abort();
this.tails.delete(streamId);
}
this.buffer.delete(streamId);
}

connectTail(runId: string, _fromSeq?: number): void {
// No-op: tails are now created per-stream lazily
}
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/v3/inputStreams/noopManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ export class NoopInputStreamManager implements InputStreamManager {
return undefined;
}

setLastSeqNum(_streamId: string, _seqNum: number): void {}

shiftBuffer(_streamId: string): boolean { return false; }

disconnectStream(_streamId: string): void {}

clearHandlers(): void {}
reset(): void {}
disconnect(): void {}
Expand Down
22 changes: 22 additions & 0 deletions packages/core/src/v3/inputStreams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ export interface InputStreamManager {
*/
lastSeqNum(streamId: string): number | undefined;

/**
* Advance the last-seen S2 sequence number for the given input stream.
* Used after `.wait()` resumes to prevent the SSE tail from replaying
* the record that was consumed via the waitpoint path.
*/
setLastSeqNum(streamId: string, seqNum: number): void;

/**
* Remove and discard the first buffered item for the given input stream.
* Used after `.wait()` resumes to remove the duplicate that the SSE tail
* buffered while the waitpoint was being completed via a separate path.
* Returns true if an item was removed, false if the buffer was empty.
*/
shiftBuffer(streamId: string): boolean;

/**
* Disconnect the SSE tail and clear the buffer for a specific input stream.
* Used before suspending via `.wait()` so the tail doesn't buffer duplicates
* of data that will be delivered through the waitpoint path.
*/
disconnectStream(streamId: string): void;

/**
* Clear all persistent `.on()` handlers and abort tails that have no remaining once waiters.
* Called automatically when a task run completes.
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/v3/realtimeStreams/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
RealtimeStreamInstance,
RealtimeStreamOperationOptions,
RealtimeStreamsManager,
StreamWriteResult,
} from "./types.js";

export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
Expand All @@ -16,7 +17,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
) {}
// Track active streams - using a Set allows multiple streams for the same key to coexist
private activeStreams = new Set<{
wait: () => Promise<void>;
wait: () => Promise<StreamWriteResult>;
abortController: AbortController;
}>();

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/realtimeStreams/noopManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class NoopRealtimeStreamsManager implements RealtimeStreamsManager {
options?: RealtimeStreamOperationOptions
): RealtimeStreamInstance<T> {
return {
wait: () => Promise.resolve(),
wait: () => Promise.resolve({}),
get stream(): AsyncIterableStream<T> {
return createAsyncIterableStreamFromAsyncIterable(source);
},
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/v3/realtimeStreams/streamInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
import { AnyZodFetchOptions } from "../zodfetch.js";
import { StreamsWriterV1 } from "./streamsWriterV1.js";
import { StreamsWriterV2 } from "./streamsWriterV2.js";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type StreamInstanceOptions<T> = {
apiClient: ApiClient;
Expand Down Expand Up @@ -63,8 +63,9 @@ export class StreamInstance<T> implements StreamsWriter {
return streamWriter;
}

public async wait(): Promise<void> {
return this.streamPromise.then((writer) => writer.wait());
public async wait(): Promise<StreamWriteResult> {
const writer = await this.streamPromise;
return writer.wait();
}

public get stream(): AsyncIterableStream<T> {
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/v3/realtimeStreams/streamsWriterV1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { request as httpsRequest } from "node:https";
import { request as httpRequest } from "node:http";
import { URL } from "node:url";
import { randomBytes } from "node:crypto";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type StreamsWriterV1Options<T> = {
baseUrl: string;
Expand Down Expand Up @@ -258,8 +258,9 @@ export class StreamsWriterV1<T> implements StreamsWriter {
await this.makeRequest(0);
}

public async wait(): Promise<void> {
return this.streamPromise;
public async wait(): Promise<StreamWriteResult> {
await this.streamPromise;
return {};
}

public [Symbol.asyncIterator]() {
Expand Down
10 changes: 6 additions & 4 deletions packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { S2, AppendRecord, BatchTransform } from "@s2-dev/streamstore";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";
import { nanoid } from "nanoid";

export type StreamsWriterV2Options<T = any> = {
Expand Down Expand Up @@ -54,6 +54,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
private readonly maxInflightBytes: number;
private aborted = false;
private sessionWritable: WritableStream<any> | null = null;
private lastSeqNum: number | undefined;

constructor(private options: StreamsWriterV2Options<T>) {
this.debug = options.debug ?? false;
Expand Down Expand Up @@ -169,9 +170,9 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
const lastAcked = session.lastAckedPosition();

if (lastAcked?.end) {
const recordsWritten = lastAcked.end.seqNum;
this.lastSeqNum = lastAcked.end.seqNum;
this.log(
`[S2MetadataStream] Written ${recordsWritten} records, ending at seqNum=${lastAcked.end.seqNum}`
`[S2MetadataStream] Written ${this.lastSeqNum} records, ending at seqNum=${this.lastSeqNum}`
);
}
} catch (error) {
Expand All @@ -184,8 +185,9 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
}
}

public async wait(): Promise<void> {
public async wait(): Promise<StreamWriteResult> {
await this.streamPromise;
return { lastEventId: this.lastSeqNum?.toString() };
}

public [Symbol.asyncIterator]() {
Expand Down
36 changes: 33 additions & 3 deletions packages/core/src/v3/realtimeStreams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ export interface RealtimeStreamsManager {
): Promise<void>;
}

export type StreamWriteResult = {
lastEventId?: string;
};

export interface RealtimeStreamInstance<T> {
wait(): Promise<void>;
wait(): Promise<StreamWriteResult>;
get stream(): AsyncIterableStream<T>;
}

export interface StreamsWriter {
wait(): Promise<void>;
wait(): Promise<StreamWriteResult>;
}

export type RealtimeDefinedStream<TPart> = {
Expand Down Expand Up @@ -71,6 +75,10 @@ export type PipeStreamOptions = {
* Additional request options for the API call.
*/
requestOptions?: ApiRequestOptions;
/** Override the default span name for this operation. */
spanName?: string;
/** When true, the span will be collapsed in the dashboard. */
collapsed?: boolean;
};

/**
Expand All @@ -89,7 +97,7 @@ export type PipeStreamResult<T> = {
* to the realtime stream. Use this to wait for the stream to complete before
* finishing your task.
*/
waitUntilComplete: () => Promise<void>;
waitUntilComplete: () => Promise<StreamWriteResult>;
};

/**
Expand Down Expand Up @@ -185,6 +193,14 @@ export type RealtimeDefinedInputStream<TData> = {
* Uses a waitpoint token internally. Can only be called inside a task.run().
*/
wait: (options?: InputStreamWaitOptions) => ManualWaitpointPromise<TData>;
/**
* Wait for data with an idle phase before suspending.
*
* Keeps the task active (using compute) for `idleTimeoutInSeconds`,
* then suspends via `.wait()` if no data arrives. If data arrives during
* the idle phase the task responds instantly without suspending.
*/
waitWithIdleTimeout: (options: InputStreamWaitWithIdleTimeoutOptions) => Promise<{ ok: true; output: TData } | { ok: false; error?: any }>;
/**
* Send data to this input stream on a specific run.
* This is used from outside the task (e.g., from your backend or another task).
Expand All @@ -199,6 +215,8 @@ export type InputStreamSubscription = {
export type InputStreamOnceOptions = {
signal?: AbortSignal;
timeoutMs?: number;
/** Override the default span name for this operation. */
spanName?: string;
};

export type SendInputStreamOptions = {
Expand Down Expand Up @@ -234,6 +252,18 @@ export type InputStreamWaitOptions = {
* and filtering waitpoints via `wait.listTokens()`.
*/
tags?: string[];

/** Override the default span name for this operation. */
spanName?: string;
};

export type InputStreamWaitWithIdleTimeoutOptions = {
/** Seconds to keep the task idle (active, using compute) before suspending. */
idleTimeoutInSeconds: number;
/** Maximum time to wait after suspending (duration string, e.g. "1h"). */
timeout?: string;
/** Override the default span name for the outer operation. */
spanName?: string;
};

export type InferInputStreamType<T> = T extends RealtimeDefinedInputStream<infer TData>
Expand Down
Loading
Loading