Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/tame-oranges-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@trigger.dev/redis-worker": patch
"@trigger.dev/sdk": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Adapted the CLI API client to propagate the trigger source via http headers.
6 changes: 5 additions & 1 deletion apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { prisma } from "~/db.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";

const ParamsSchema = z.object({
/* This is the run friendly ID */
Expand Down Expand Up @@ -41,8 +42,11 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ error: "Run not found" }, { status: 404 });
}

const triggerSource =
sanitizeTriggerSource(request.headers.get("x-trigger-source")) ?? "api";

const service = new ReplayTaskRunService();
const newRun = await service.call(taskRun);
const newRun = await service.call(taskRun, { triggerSource });

if (!newRun) {
return json({ error: "Failed to create new run" }, { status: 400 });
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
handleRequestIdempotency,
saveRequestIdempotency,
} from "~/utils/requestIdempotency.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server";

Expand All @@ -36,6 +37,7 @@ export const HeadersSchema = z.object({
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
"x-trigger-request-idempotency-key": z.string().nullish(),
"x-trigger-realtime-streams-version": z.string().nullish(),
"x-trigger-source": z.string().nullish(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});
Expand Down Expand Up @@ -67,6 +69,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
} = headers;

const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
Expand Down Expand Up @@ -119,6 +122,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
triggerAction: "trigger",
},
engineVersion ?? undefined
);
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
BatchTriggerV3Service,
} from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";
Expand Down Expand Up @@ -72,6 +73,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -113,6 +115,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
triggerAction: "trigger",
});

const $responseHeaders = await responseHeaders(
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/routes/api.v2.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";
Expand Down Expand Up @@ -62,6 +63,7 @@ const { action, loader } = createActionApiRoute(
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -127,6 +129,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
triggerAction: "trigger",
});

const $responseHeaders = await responseHeaders(
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/routes/api.v3.batches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "~/utils/requestIdempotency.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";
Expand Down Expand Up @@ -65,6 +66,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -132,6 +134,7 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
});

const $responseHeaders = await responseHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ export const action: ActionFunction = async ({ request, params }) => {
ttlSeconds: submission.value.ttlSeconds,
version: submission.value.version,
prioritySeconds: submission.value.prioritySeconds,
triggerSource: "dashboard",
});

if (!newRun) {
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export type BatchTriggerTaskServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

/**
Expand Down Expand Up @@ -678,6 +680,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
batchId: batch.id,
batchIndex: currentIndex,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
triggerSource: options?.triggerSource ?? "api",
triggerAction: options?.triggerAction ?? "trigger",
},
"V2"
);
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/runEngine/services/createBatch.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type CreateBatchServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
};

/**
Expand Down Expand Up @@ -143,6 +144,7 @@ export class CreateBatchService extends WithRunEngine {
idempotencyKey: body.idempotencyKey,
processingConcurrency: config.processingConcurrency,
planType,
triggerSource: options.triggerSource,
};

await this._engine.initializeBatch(initOptions);
Expand Down
13 changes: 13 additions & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
import { Tracer } from "@opentelemetry/api";
import { tryCatch } from "@trigger.dev/core/utils";
import {
RunAnnotations,
TaskRunError,
taskRunErrorEnhancer,
taskRunErrorToString,
Expand Down Expand Up @@ -289,6 +290,17 @@ export class RunEngineTriggerTaskService {

const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);

// Build annotations for this run
const triggerSource = options.triggerSource ?? "api";
const triggerAction = options.triggerAction ?? "trigger";
const parentAnnotations = RunAnnotations.safeParse(parentRun?.annotations).data;
const annotations = {
triggerSource,
triggerAction,
rootTriggerSource: parentAnnotations?.rootTriggerSource ?? triggerSource,
rootScheduleId: parentAnnotations?.rootScheduleId || options.scheduleId || undefined,
};

try {
return await this.traceEventConcern.traceRun(
triggerRequest,
Expand Down Expand Up @@ -369,6 +381,7 @@ export class RunEngineTriggerTaskService {
planType,
realtimeStreamsVersion: options.realtimeStreamsVersion,
debounce: body.options?.debounce,
annotations,
// When debouncing with triggerAndWait, create a span for the debounced trigger
onDebounced:
body.options?.debounce && body.options?.resumeParentOnCompletion
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/utils/triggerSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const ALLOWED_TRIGGER_SOURCES = new Set(["sdk", "cli", "mcp"]);

/** Validates a client-provided trigger source header against the allowlist. */
export function sanitizeTriggerSource(value: string | null | undefined): string | undefined {
if (value && ALLOWED_TRIGGER_SOURCES.has(value)) {
return value;
}
return undefined;
}
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ export function setupBatchQueueCallbacks() {
batchIndex: itemIndex,
realtimeStreamsVersion: meta.realtimeStreamsVersion,
planType: meta.planType,
triggerSource: meta.parentRunId ? "sdk" : meta.triggerSource ?? "api",
triggerAction: "trigger",
},
"V2"
);
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ function createScheduleEngine() {
scheduleInstanceId,
queueTimestamp: exactScheduleTime,
overrideCreatedAt: exactScheduleTime,
triggerSource: "schedule",
triggerAction: "trigger",
}
);

Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export type BatchTriggerTaskServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

type RunItemData = {
Expand Down Expand Up @@ -853,6 +855,8 @@ export class BatchTriggerV3Service extends BaseService {
skipChecks: true,
runFriendlyId: task.runId,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
triggerSource: options?.triggerSource ?? "api",
triggerAction: options?.triggerAction ?? "trigger",
}
);

Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ export class BulkActionService extends BaseService {
const [error, result] = await tryCatch(
replayService.call(run, {
bulkActionId: bulkActionId,
triggerSource: "dashboard",
})
);
if (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class PerformBulkActionService extends BaseService {
switch (item.group.type) {
case "REPLAY": {
const service = new ReplayTaskRunService(this._prisma);
const result = await service.call(item.sourceRun);
const result = await service.call(item.sourceRun, { triggerSource: "dashboard" });

await this._prisma.bulkActionItem.update({
where: { id: item.id },
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/services/replayTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type OverrideOptions = {
payload?: string;
metadata?: unknown;
bulkActionId?: string;
triggerSource?: string;
} & RunOptionsData;

export class ReplayTaskRunService extends BaseService {
Expand Down Expand Up @@ -123,6 +124,8 @@ export class ReplayTaskRunService extends BaseService {
realtimeStreamsVersion: determineRealtimeStreamsVersion(
existingTaskRun.realtimeStreamsVersion
),
triggerSource: overrideOptions.triggerSource ?? "api",
triggerAction: "replay",
}
);

Expand Down
51 changes: 29 additions & 22 deletions apps/webapp/app/v3/services/testTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,35 @@ export class TestTaskService extends BaseService {

switch (triggerSource) {
case "STANDARD": {
const result = await triggerTaskService.call(data.taskIdentifier, environment, {
payload: data.payload,
options: {
test: true,
metadata: data.metadata,
delay: data.delaySeconds ? new Date(Date.now() + data.delaySeconds * 1000) : undefined,
ttl: data.ttlSeconds,
idempotencyKey: data.idempotencyKey,
idempotencyKeyTTL: data.idempotencyKeyTTLSeconds
? `${data.idempotencyKeyTTLSeconds}s`
: undefined,
queue: data.queue ? { name: data.queue } : undefined,
concurrencyKey: data.concurrencyKey,
maxAttempts: data.maxAttempts,
maxDuration: data.maxDurationSeconds,
tags: data.tags,
machine: data.machine,
region: data.region,
lockToVersion: data.version === "latest" ? undefined : data.version,
priority: data.prioritySeconds,
const result = await triggerTaskService.call(
data.taskIdentifier,
environment,
{
payload: data.payload,
options: {
test: true,
metadata: data.metadata,
delay: data.delaySeconds
? new Date(Date.now() + data.delaySeconds * 1000)
: undefined,
ttl: data.ttlSeconds,
idempotencyKey: data.idempotencyKey,
idempotencyKeyTTL: data.idempotencyKeyTTLSeconds
? `${data.idempotencyKeyTTLSeconds}s`
: undefined,
queue: data.queue ? { name: data.queue } : undefined,
concurrencyKey: data.concurrencyKey,
maxAttempts: data.maxAttempts,
maxDuration: data.maxDurationSeconds,
tags: data.tags,
machine: data.machine,
region: data.region,
lockToVersion: data.version === "latest" ? undefined : data.version,
priority: data.prioritySeconds,
},
},
});
{ triggerSource: "dashboard", triggerAction: "test" }
);

return result?.run;
}
Expand Down Expand Up @@ -72,7 +79,7 @@ export class TestTaskService extends BaseService {
priority: data.prioritySeconds,
},
},
{ customIcon: "scheduled" }
{ customIcon: "scheduled", triggerSource: "dashboard", triggerAction: "test" }
);

return result?.run;
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export type TriggerTaskServiceOptions = {
replayedFromTaskRunFriendlyId?: string;
planType?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

export class OutOfEntitlementError extends Error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "annotations" JSONB;
3 changes: 3 additions & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,9 @@ model TaskRun {
metadataType String @default("application/json")
metadataVersion Int @default(1)

/// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId
annotations Json?

/// Run output
output String?
outputType String @default("application/json")
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ export class BatchQueue {
realtimeStreamsVersion: options.realtimeStreamsVersion,
idempotencyKey: options.idempotencyKey,
processingConcurrency: options.processingConcurrency,
triggerSource: options.triggerSource,
};

// Store metadata in completion tracker
Expand Down
Loading
Loading