Workflow Engine — Multi-Step Orchestration for Bun
Orchestrate multi-step business processes with a fluent, chainable DSL. Saga compensation, step retry with exponential backoff, parallel execution, conditional branching, nested sub-workflows, human-in-the-loop signals with timeout, loop control flow (doUntil/doWhile), forEach iteration, map transforms, schema validation (Zod-compatible), per-execution subscribe, crash recovery, type-safe step chaining, typed observability events, and cleanup/archival — all built on top of bunqueue’s Queue and Worker. No new infrastructure, no external services, no YAML.
validate ──→ reserve stock ──→ charge payment ──→ send confirmation ↑ ↑ compensate: compensate: release stock refund paymentbunqueue vs Competitors
Section titled “bunqueue vs Competitors”| bunqueue | Temporal | Inngest | Trigger.dev | |
|---|---|---|---|---|
| Definition | TypeScript DSL | TypeScript + decorators | step.run() wrappers | TypeScript functions |
| Infrastructure | None (embedded SQLite) | PostgreSQL + 7 services | Cloud-only (no self-host) | Redis + PostgreSQL |
| Saga compensation | Built-in | Manual | Manual | Manual |
| Human-in-the-loop | .waitFor() + signal() | Signals API | step.waitForEvent() | Waitpoint tokens |
| Branching | .branch().path() | Code-level if/else | Code-level if/else | Code-level if/else |
| Parallel steps | .parallel() | Promise.all | step.run() in parallel | Manual |
| Step retry | Built-in (exponential backoff) | Built-in | Built-in | Built-in |
| Signal timeout | .waitFor(event, { timeout }) | Workflow.await | step.waitForEvent timeout | Manual |
| Nested workflows | .subWorkflow() | Child workflows | step.invoke() | Manual |
| Observability | Typed event emitter | Temporal UI | Inngest dashboard | Dashboard |
| Loops (doUntil/doWhile) | .doUntil() / .doWhile() | Code-level loops | Manual | Manual |
| forEach | .forEach() with indexed results | Code-level loops | Manual | Manual |
| Map transform | .map() | Code-level | Manual | Manual |
| Schema validation | Duck-typed .parse() (Zod, ArkType) | Manual | Built-in | Manual |
| Per-execution subscribe | engine.subscribe(id, cb) | Manual | Webhook | Manual |
| Crash recovery | engine.recover() | Built-in | Built-in | Built-in |
| Type-safe step chaining | Generic accumulator | Manual casting | Manual casting | Manual casting |
| Cleanup/archival | Built-in SQLite archive | Manual | Auto (cloud) | Manual |
| Self-hosted | Yes (zero-config) | Yes (complex) | No | Yes (complex) |
| Pricing | Free (MIT) | Free self-hosted / Cloud $$ | Free tier, then per-execution | Free tier, then $50/mo+ |
| Setup time | bun add bunqueue | Hours to days | Minutes (cloud) | 30min+ self-hosted |
Why bunqueue?
Section titled “Why bunqueue?”- Zero infrastructure. Temporal needs PostgreSQL + 7 services. Trigger.dev needs Redis + PostgreSQL. bunqueue needs nothing — SQLite is embedded.
- Saga pattern is first-class. Every competitor requires you to implement compensation manually. bunqueue runs compensate handlers in reverse order automatically.
- TypeScript-native DSL. No decorators (Temporal), no wrapper functions (Inngest). Just
.step().step().branch().step(). - Same process, same codebase. No separate worker infrastructure, no deployment pipeline for workflow definitions. It’s a library, not a platform.
When to use something else
Section titled “When to use something else”- Multi-region HA with automatic failover — Use Temporal
- Serverless-first with zero ops — Use Inngest
- Already running Redis with BullMQ — Use BullMQ FlowProducer for simple parent-child chains
- Guaranteed exactly-once across restarts — bunqueue’s
engine.recover()provides at-most-once crash recovery (re-enqueues orphaned executions, re-arms signal timeouts). For guaranteed exactly-once execution with distributed coordination, use Temporal
Quick Start
Section titled “Quick Start”bun add bunqueueimport { Workflow, Engine } from 'bunqueue/workflow';
// Define a type-safe workflow — each step's return type is tracked automaticallyconst orderFlow = new Workflow<{ orderId: string; amount: number }>('order-pipeline') .step('validate', async (ctx) => { // ctx.input is typed as { orderId: string; amount: number } if (ctx.input.amount <= 0) throw new Error('Invalid amount'); return { orderId: ctx.input.orderId, validated: true }; }) .step('charge', async (ctx) => { // ctx.steps.validate is typed as { orderId: string; validated: boolean } const txId = await payments.charge(ctx.steps.validate.orderId, ctx.input.amount); return { transactionId: txId }; }, { compensate: async () => { // Runs automatically if a later step fails await payments.refund(); }, }) .step('confirm', async (ctx) => { // ctx.steps.charge is typed as { transactionId: string } await mailer.send('order-confirm', { txId: ctx.steps.charge.transactionId }); return { emailSent: true, transactionId: ctx.steps.charge.transactionId }; });
// Create engine and runconst engine = new Engine({ embedded: true });engine.register(orderFlow);
const run = await engine.start('order-pipeline', { orderId: 'ORD-1', amount: 99.99,});
// Check statusconst exec = engine.getExecution(run.id);console.log(exec.state); // 'running' | 'completed' | 'failed' | 'waiting' | 'compensating'
// Recover orphaned executions after a crash/restartconst recovered = await engine.recover();console.log(`Recovered ${recovered.total} executions`);Core Concepts
Section titled “Core Concepts”Steps are the building blocks. Each step receives a context with the workflow input and all previous step results. When you provide a type parameter to Workflow<TInput>, all steps get full type inference — no casting needed:
const flow = new Workflow<{ source: string }>('data-pipeline') .step('extract', async (ctx) => { // ctx.input.source is typed as string const rawData = await fetchFromSource(ctx.input.source); return { records: rawData.length, data: rawData }; }) .step('transform', async (ctx) => { // ctx.steps.extract is typed as { records: number; data: RawRecord[] } const cleaned = ctx.steps.extract.data.filter(r => r.valid).map(normalize); return { cleaned, dropped: ctx.steps.extract.data.length - cleaned.length }; }) .step('load', async (ctx) => { // ctx.steps.transform is typed as { cleaned: CleanRecord[]; dropped: number } await db.insertBatch('analytics', ctx.steps.transform.cleaned); return { loaded: ctx.steps.transform.cleaned.length, source: ctx.input.source }; });StepContext shape:
| Property | Type | Description |
|---|---|---|
ctx.input | TInput | The input passed to engine.start(). Typed when Workflow<TInput> is used. |
ctx.steps | TSteps | Results from all completed steps (keyed by step name). Accumulates types automatically. |
ctx.signals | Record<string, unknown> | Data from received signals (keyed by event name) |
ctx.executionId | string | Unique execution ID |
Every step must return a value (or undefined). The return value becomes available to subsequent steps via ctx.steps.stepName (or ctx.steps['step-name'] for hyphenated names).
Compensation (Saga Pattern)
Section titled “Compensation (Saga Pattern)”When a step fails, compensation handlers run in reverse order for all previously completed steps. This implements the saga pattern — the industry-standard approach for distributed transactions without two-phase commit.
const flow = new Workflow('money-transfer') .step('debit-source', async (ctx) => { const { from, amount } = ctx.input as { from: string; to: string; amount: number }; await accounts.debit(from, amount); return { debited: true, account: from, amount }; }, { compensate: async (ctx) => { // Undo: credit back the source account const { from, amount } = ctx.input as { from: string; to: string; amount: number }; await accounts.credit(from, amount); console.log('Rolled back: source account credited'); }, }) .step('credit-target', async (ctx) => { const { to, amount } = ctx.input as { from: string; to: string; amount: number }; await accounts.credit(to, amount); return { credited: true, account: to, amount }; }, { compensate: async (ctx) => { // Undo: debit back the target account const { to, amount } = ctx.input as { from: string; to: string; amount: number }; await accounts.debit(to, amount); console.log('Rolled back: target account debited'); }, }) .step('send-receipt', async () => { throw new Error('Email service down'); // → Compensation runs automatically in reverse: // 1. credit-target compensate (debit target) // 2. debit-source compensate (credit source) });How it works:
- Steps A, B, C execute in order
- Step C throws an error
- Engine runs compensation for B, then A (reverse order)
- Execution state becomes
'failed'
Compensation is best-effort — if a compensate handler itself throws, the error is logged but the remaining compensations still run.
Branching
Section titled “Branching”Route execution to different paths based on runtime conditions:
const flow = new Workflow('support-ticket') .step('classify', async (ctx) => { const { message, plan } = ctx.input as { message: string; plan: string }; const sentiment = await analyzeSentiment(message); const priority = plan === 'enterprise' ? 'high' : sentiment < 0 ? 'medium' : 'low'; return { priority }; }) .branch((ctx) => (ctx.steps['classify'] as { priority: string }).priority) .path('high', (w) => w.step('assign-senior', async (ctx) => { const agent = await roster.getAvailable('senior'); await slack.notify(agent, 'Urgent ticket assigned'); return { assignedTo: agent.name, sla: '1h' }; }) ) .path('medium', (w) => w.step('assign-regular', async (ctx) => { const agent = await roster.getAvailable('regular'); return { assignedTo: agent.name, sla: '4h' }; }) ) .path('low', (w) => w.step('auto-reply', async (ctx) => { await mailer.sendTemplate('auto-reply', ctx.input); return { assignedTo: 'bot', sla: '24h' }; }) ) .step('log-ticket', async (ctx) => { // This step always runs, regardless of which branch was taken await auditLog.write('ticket-created', { executionId: ctx.executionId }); return { logged: true }; });Rules:
- The branch function returns a string that matches one of the
.path()names - Only the matching path executes; others are skipped entirely
- Steps after the branch block always run (convergence point)
- Each path can contain multiple steps, nested branches, or
waitForcalls
WaitFor (Human-in-the-Loop)
Section titled “WaitFor (Human-in-the-Loop)”Pause execution until an external signal arrives. This is how you implement approval gates, manual review steps, or any process that needs human input:
const flow = new Workflow('content-publishing') .step('draft', async (ctx) => { const { title, body } = ctx.input as { title: string; body: string }; const draft = await cms.createDraft({ title, body }); await slack.notify('#editorial', `New draft "${title}" ready for review`); return { draftId: draft.id, previewUrl: draft.previewUrl }; }) .waitFor('editorial-review') .step('publish-or-reject', async (ctx) => { const review = ctx.signals['editorial-review'] as { approved: boolean; editor: string; notes?: string; };
const { draftId } = ctx.steps['draft'] as { draftId: string };
if (!review.approved) { await cms.reject(draftId, review.notes); return { status: 'rejected', editor: review.editor }; }
const published = await cms.publish(draftId); return { status: 'published', url: published.url, editor: review.editor }; });
// Start the workflowconst run = await engine.start('content-publishing', { title: 'Announcing Workflow Engine', body: '...',});
// The execution pauses at 'editorial-review' with state: 'waiting'// Your app can show a UI, send a Slack button, expose an API endpoint, etc.
// When the editor makes a decision (could be minutes, hours, or days later):await engine.signal(run.id, 'editorial-review', { approved: true, editor: 'alice@company.com', notes: 'Great article, ship it!',});// → Execution resumes from 'publish-or-reject'Key behaviors:
waitFor('event')transitions the execution tostate: 'waiting'- The execution is persisted to SQLite — it survives process restarts. Call
engine.recover()on startup to re-enqueue orphanedrunningexecutions and re-armwaitingtimeouts engine.signal(id, event, payload)stores the payload and resumes execution- The signal data is available in
ctx.signals['event-name'] - You can have multiple
waitForcalls in a single workflow (e.g., multi-stage approvals)
Step Timeout
Section titled “Step Timeout”Prevent steps from running indefinitely:
const flow = new Workflow('api-aggregation') .step('fetch-primary', async () => { const res = await fetch('https://api.primary.com/data'); return await res.json(); }, { timeout: 5000 }) // 5 second timeout .step('fetch-secondary', async () => { const res = await fetch('https://api.secondary.com/data'); return await res.json(); }, { timeout: 10000 }) // 10 second timeout .step('merge', async (ctx) => { const primary = ctx.steps['fetch-primary']; const secondary = ctx.steps['fetch-secondary']; return { ...primary, ...secondary }; });If a step exceeds its timeout, it fails with a "timed out" error. If the step has a compensate handler, compensation runs for all previously completed steps.
Step Retry with Backoff
Section titled “Step Retry with Backoff”Steps can retry automatically with exponential backoff and jitter:
const flow = new Workflow('resilient-pipeline') .step('call-api', async () => { const res = await fetch('https://api.external.com/data'); if (!res.ok) throw new Error(`HTTP ${res.status}`); return await res.json(); }, { retry: 5, // Max 5 attempts (default: 3) timeout: 10000, // 10s per attempt }) .step('process', async (ctx) => { const data = ctx.steps['call-api'] as ApiResponse; return { processed: true }; });Backoff formula: min(500ms * 2^attempt + jitter, 30s). The first retry waits ~500ms, the second ~1s, the third ~2s, capping at 30 seconds. Jitter prevents thundering herds.
The execution tracks attempt count in exec.steps['step-name'].attempts. If all retries are exhausted, the step fails and compensation runs.
Parallel Steps
Section titled “Parallel Steps”Run multiple steps concurrently with .parallel():
const flow = new Workflow('data-enrichment') .step('fetch-user', async (ctx) => { const { userId } = ctx.input as { userId: string }; return await db.users.find(userId); }) .parallel((w) => w .step('fetch-orders', async (ctx) => { const { userId } = ctx.input as { userId: string }; return await db.orders.findByUser(userId); }) .step('fetch-preferences', async (ctx) => { const { userId } = ctx.input as { userId: string }; return await db.preferences.get(userId); }) .step('fetch-activity', async (ctx) => { const { userId } = ctx.input as { userId: string }; return await analytics.getRecent(userId); }) ) .step('merge', async (ctx) => { // All parallel step results are available const orders = ctx.steps['fetch-orders']; const prefs = ctx.steps['fetch-preferences']; const activity = ctx.steps['fetch-activity']; return { profile: { orders, prefs, activity } }; });How it works:
- All steps inside
.parallel()run viaPromise.allSettled - Results from each parallel step are saved to
exec.stepslike normal steps - If any parallel step fails, the entire parallel group fails and compensation runs. The error thrown is an
AggregateErrorcontaining all individual failure reasons - Steps after the parallel block wait for all parallel steps to finish
Signal Timeout
Section titled “Signal Timeout”Add a timeout to waitFor so workflows don’t hang indefinitely:
const flow = new Workflow('time-limited-approval') .step('submit', async (ctx) => { const { amount } = ctx.input as { amount: number }; await slack.notify('#approvals', `Expense $${amount} needs review`); return { submitted: true }; }) .waitFor('manager-approval', { timeout: 86400000 }) // 24 hours .step('process', async (ctx) => { const decision = ctx.signals['manager-approval'] as { approved: boolean }; return { status: decision.approved ? 'paid' : 'rejected' }; });If the signal doesn’t arrive within the timeout:
- The execution state becomes
'failed' - The error is stored in
exec.steps['__waitFor:manager-approval'].error - Compensation runs for all previously completed steps
- A
signal:timeoutevent is emitted
Nested Workflows (Sub-Workflows)
Section titled “Nested Workflows (Sub-Workflows)”Compose workflows by calling child workflows from a parent:
const paymentFlow = new Workflow('payment') .step('validate-card', async (ctx) => { const { cardToken, amount } = ctx.input as { cardToken: string; amount: number }; return { valid: true, amount }; }) .step('charge', async (ctx) => { return { txId: `tx_${Date.now()}` }; }, { compensate: async () => { await payments.refund(); }, });
const orderFlow = new Workflow('order') .step('create-order', async (ctx) => { const { amount } = ctx.input as { amount: number; cardToken: string }; return { orderId: `ORD-${Date.now()}`, total: amount }; }) .subWorkflow('payment', (ctx) => ({ // Map parent context to child input cardToken: (ctx.input as { cardToken: string }).cardToken, amount: (ctx.steps['create-order'] as { total: number }).total, })) .step('confirm', async (ctx) => { // Child results available under 'sub:<workflow-name>' const paymentResult = ctx.steps['sub:payment'] as Record<string, unknown>; return { confirmed: true, payment: paymentResult }; });
const engine = new Engine({ embedded: true });engine.register(paymentFlow); // Register child firstengine.register(orderFlow);
await engine.start('order', { amount: 99, cardToken: 'tok_abc' });Key behaviors:
- The parent workflow pauses while the child executes (polling every 100ms)
- Child workflow results are stored under
ctx.steps['sub:<child-name>'] - If the child fails, the parent fails too (and parent compensation runs)
- The input mapper function receives the parent’s context, allowing you to pass any data from parent steps to the child
- Sub-workflows have a hardcoded 300-second (5-minute) timeout. If the child doesn’t complete within that window, the parent step fails
Observability (Events)
Section titled “Observability (Events)”Subscribe to typed workflow events for monitoring, logging, and debugging:
const engine = new Engine({ embedded: true });
// Listen to specific event typesengine.on('workflow:started', (event) => { console.log(`Workflow ${event.workflowName} started: ${event.executionId}`);});
engine.on('step:completed', (event) => { const { stepName } = event as StepEvent; console.log(`Step ${stepName} completed in ${event.executionId}`);});
engine.on('workflow:failed', (event) => { alerting.send(`Workflow ${event.workflowName} failed: ${event.executionId}`);});
// Listen to ALL eventsengine.onAny((event) => { metrics.increment(`workflow.${event.type}`, { workflow: event.workflowName, });});
// Or pass onEvent in constructorconst engine2 = new Engine({ embedded: true, onEvent: (event) => logger.info(event),});Available event types:
| Event | When |
|---|---|
workflow:started | engine.start() is called |
workflow:completed | All steps finished successfully |
workflow:failed | A step threw after retries exhausted |
workflow:waiting | Execution paused at a waitFor |
workflow:compensating | Compensation is running |
step:started | A step begins executing |
step:completed | A step finished successfully |
step:failed | A step threw an error |
step:retry | A step is about to retry after failure |
signal:received | engine.signal() delivered a signal |
signal:timeout | A waitFor timed out |
Use engine.off(type, listener) and engine.offAny(listener) to unsubscribe.
Cleanup & Archival
Section titled “Cleanup & Archival”Manage execution history with built-in cleanup and archival:
const engine = new Engine({ embedded: true });
// Delete old completed/failed executions (older than 7 days)const deleted = engine.cleanup(7 * 24 * 60 * 60 * 1000);console.log(`Deleted ${deleted} executions`);
// Or selectively clean only completed executionsengine.cleanup(7 * 24 * 60 * 60 * 1000, ['completed']);
// Archive instead of delete (moves to archive table)const archived = engine.archive(30 * 24 * 60 * 60 * 1000); // 30 daysconsole.log(`Archived ${archived} executions`);
// Check archive countconsole.log(`Total archived: ${engine.getArchivedCount()}`);Cleanup vs Archive:
cleanup(maxAgeMs, states?)— Permanently deletes executions older thanmaxAgeMsarchive(maxAgeMs, states?)— Moves executions to a separateworkflow_executions_archivetable (transactional, up to 1000 per call)- Both accept an optional
statesfilter:['completed'],['failed'],['completed', 'failed'], etc.
Type-Safe Steps
Section titled “Type-Safe Steps”The Workflow DSL uses a generic accumulator pattern to track step return types at compile time. Each .step() call returns a narrower type, so subsequent steps see exactly what previous steps returned — no as casts needed.
// With type parameter: full type inferenceconst flow = new Workflow<{ userId: string; email: string }>('onboarding') .step('create', async (ctx) => { // ctx.input is { userId: string; email: string } return { accountId: `acc_${ctx.input.userId}` }; }) .step('configure', async (ctx) => { // ctx.steps.create is { accountId: string } — inferred automatically await setupDefaults(ctx.steps.create.accountId); return { configured: true }; }) .step('notify', async (ctx) => { // Both previous steps are available and typed const { accountId } = ctx.steps.create; // string const { configured } = ctx.steps.configure; // boolean await mailer.send(ctx.input.email, { accountId }); return { notified: true }; });How it works:
The Workflow class has two type parameters: Workflow<TInput, TSteps>. Each .step() call returns Workflow<TInput, TSteps & Record<TName, Awaited<TResult>>> — the step name and return type are added to TSteps. This means:
ctx.inputis typed asTInput(the type you pass toWorkflow<TInput>)ctx.stepsaccumulates all completed step results by name- TypeScript catches typos and type mismatches at compile time
Works with all node types:
const flow = new Workflow<{ items: string[] }>('typed-pipeline') .step('init', async (ctx) => ({ count: ctx.input.items.length })) .parallel<{ a: number; b: string }>((w) => w .step('a', async () => 42) .step('b', async () => 'hello') ) // After parallel: ctx.steps has init + a + b .map('summary', (ctx) => ({ total: ctx.steps.a + ctx.steps.init.count, })) // After map: ctx.steps has init + a + b + summary .step('final', async (ctx) => { return { result: ctx.steps.summary.total }; });Backward compatible: If you don’t pass a type parameter, Workflow defaults to Workflow<unknown, {}> and behaves exactly like before — you can still use as casts.
Crash Recovery
Section titled “Crash Recovery”After a crash or restart, orphaned executions (stuck in running, waiting, or compensating) can be recovered with engine.recover():
const engine = new Engine({ embedded: true, dataPath: './data/wf.db' });
// Register all workflows before recoveringengine.register(orderFlow);engine.register(paymentFlow);
// Recover orphaned executions from the previous processconst result = await engine.recover();console.log(`Recovered: ${result.running} running, ${result.waiting} waiting, ${result.compensating} compensating`);console.log(`Total: ${result.total}`);What recover() does for each state:
| State | Recovery action |
|---|---|
running | Re-enqueues the step at currentNodeIndex so it resumes from where it left off |
waiting | Re-arms the signal timeout timer. If the signal already arrived while the process was down, immediately resumes the execution |
compensating | Re-runs compensation from the beginning (compensation handlers must be idempotent) |
Best practice: Call engine.recover() right after registering all workflows, before starting new executions:
const engine = new Engine({ embedded: true, dataPath: './data/wf.db' });engine.register(orderFlow);engine.register(paymentFlow);
// Always recover on startupconst recovered = await engine.recover();if (recovered.total > 0) { console.log(`Recovered ${recovered.total} orphaned executions`);}
// Now safe to start new workflowsconst run = await engine.start('process-order', { orderId: 'ORD-1' });RecoverResult shape:
interface RecoverResult { running: number; // Running executions re-enqueued waiting: number; // Waiting executions with re-armed timers compensating: number; // Compensating executions re-run total: number; // Total recovered}Loops (doUntil / doWhile)
Section titled “Loops (doUntil / doWhile)”Repeat a set of steps based on a condition. Two flavors:
doUntil(condition, builder, options?)— Runs steps first, then checks condition. Repeats until condition returnstrue(do…until semantics).doWhile(condition, builder, options?)— Checks condition first, then runs steps. Repeats while condition returnstrue(while…do semantics).
// doUntil: retry sending until delivery confirmedconst flow = new Workflow('delivery') .doUntil( (ctx) => (ctx.steps['send'] as { delivered: boolean })?.delivered === true, (w) => w.step('send', async (ctx) => { const result = await deliveryService.attempt(ctx.input); return { delivered: result.success }; }), { maxIterations: 10 } // safety limit (default: 100) );
// doWhile: process items while queue has itemsconst batchFlow = new Workflow('batch') .doWhile( (ctx) => { const remaining = (ctx.steps['process'] as { remaining: number })?.remaining ?? 10; return remaining > 0; }, (w) => w.step('process', async (ctx) => { const batch = await fetchNextBatch(); await processBatch(batch); return { remaining: await getQueueSize() }; }), );Key behaviors:
doWhilecan skip entirely if the condition isfalseon the first checkdoUntilalways runs at least oncemaxIterationsprevents infinite loops (default: 100)- Loop step results are overwritten each iteration — only the last iteration’s result is available downstream
- Conditions can be async (return a
Promise<boolean>)
forEach
Section titled “forEach”Iterate over a dynamic list of items, executing a step for each:
const flow = new Workflow<{ userIds: string[] }>('notify-all') .forEach( (ctx) => (ctx.input as { userIds: string[] }).userIds, // items extractor 'notify', // step name async (ctx) => { const userId = ctx.steps.__item as string; // current item const index = ctx.steps.__index as number; // current index (0-based) await sendNotification(userId); return { notified: userId }; }, { retry: 3, timeout: 10_000 } // standard step options );Key behaviors:
- Results are stored with indexed names:
notify:0,notify:1,notify:2, etc. - Each iteration receives
__item(current item) and__index(current index) viactx.steps - Items are processed sequentially (not in parallel)
maxIterationsoption limits array size (default: 1000)- Standard step options (
retry,timeout,compensate,inputSchema,outputSchema) apply to each iteration
Transform step results into a new value without executing an async handler. A synchronous, pure data-transform node:
const flow = new Workflow('etl') .step('fetch', async () => { const records = await db.query('SELECT * FROM events'); return { records }; }) .map('aggregate', (ctx) => { const { records } = ctx.steps['fetch'] as { records: Event[] }; return { total: records.length, byType: Object.groupBy(records, r => r.type), }; }) .step('store', async (ctx) => { const agg = ctx.steps['aggregate'] as AggregatedData; await analytics.insert(agg); return { stored: true }; });Key behaviors:
- Runs synchronously (no retry, no timeout) — it’s a pure transform
- Result is stored under the map name (e.g.,
ctx.steps['aggregate']) - The transform function receives the full
StepContext(input, steps, signals)
Schema Validation
Section titled “Schema Validation”Validate step inputs and outputs with any schema library that has a .parse() method (Zod, ArkType, Valibot, etc.):
import { z } from 'zod';
const OrderInput = z.object({ orderId: z.string(), amount: z.number().positive(),});
const ChargeResult = z.object({ transactionId: z.string(), charged: z.number(),});
const flow = new Workflow('validated-order') .step('validate', async (ctx) => { const { orderId } = ctx.input as { orderId: string; amount: number }; return { orderId, validated: true }; }, { inputSchema: OrderInput, // validates ctx.input before handler runs }) .step('charge', async (ctx) => { return { transactionId: 'tx_123', charged: 99.99 }; }, { outputSchema: ChargeResult, // validates return value after handler runs });Key behaviors:
inputSchemavalidatesctx.inputbefore the step handler executesoutputSchemavalidates the handler’s return value after execution- Uses duck typing: any object with a
.parse(data)method works — no runtime dependency on Zod - Validation failure throws an error (triggers retry or compensation like any other step failure)
- Works with Zod, ArkType, Valibot, or any custom schema object
Subscribe
Section titled “Subscribe”Monitor a specific execution’s events in real-time:
const run = await engine.start('order-pipeline', { orderId: 'ORD-1' });
// Subscribe to all events for this executionconst unsubscribe = engine.subscribe(run.id, (event) => { console.log(`[${event.type}]`, event);
if (event.type === 'workflow:completed') { console.log('Order pipeline finished!'); }});
// Later: stop listeningunsubscribe();Key behaviors:
- Returns an
unsubscribefunction — call it to stop receiving events - Only receives events for the specified execution ID (filters automatically)
- Receives all event types:
step:started,step:completed,step:failed,step:retry,workflow:*,signal:* - Complements
engine.on()/engine.onAny()which are global (all executions)
Engine API
Section titled “Engine API”Constructor
Section titled “Constructor”// Embedded mode — everything in-process, no server neededconst engine = new Engine({ embedded: true });
// Embedded with SQLite persistenceconst engine = new Engine({ embedded: true, dataPath: './data/workflows.db',});
// TCP mode — connects to a running bunqueue serverconst engine = new Engine({ connection: { host: 'localhost', port: 6789 },});
// All optionsconst engine = new Engine({ embedded: true, // Use embedded mode (default: false) connection: { port: 6789 }, // TCP server connection (mutually exclusive with embedded) dataPath: './data/wf.db', // SQLite persistence path concurrency: 10, // Max parallel step executions (default: 5) queueName: '__wf:steps', // Internal queue name (default: '__wf:steps') onEvent: (event) => {}, // Global event listener (optional)});Methods
Section titled “Methods”| Method | Returns | Description |
|---|---|---|
engine.register(workflow) | this | Register a workflow definition. Chainable. |
engine.start(name, input?) | Promise<RunHandle> | Start a new execution. Returns { id, workflowName }. |
engine.getExecution(id) | Execution | null | Get full execution state by ID. |
engine.listExecutions(name?, state?) | Execution[] | List executions with optional filters. |
engine.signal(id, event, payload?) | Promise<void> | Send a signal to resume a waiting execution. |
engine.on(type, listener) | this | Subscribe to a specific event type. Chainable. |
engine.onAny(listener) | this | Subscribe to all events. Chainable. |
engine.off(type, listener) | this | Unsubscribe from a specific event type. Chainable. |
engine.offAny(listener) | this | Unsubscribe from all events. Chainable. |
engine.subscribe(id, callback) | () => void | Subscribe to events for a specific execution. Returns unsubscribe function. |
engine.recover() | Promise<RecoverResult> | Re-enqueue orphaned executions after crash/restart. Returns counts by state. |
engine.cleanup(maxAgeMs, states?) | number | Delete executions older than maxAgeMs. Returns count. |
engine.archive(maxAgeMs, states?) | number | Move old executions to archive table. Returns count. |
engine.getArchivedCount() | number | Count of archived executions. |
engine.close(force?) | Promise<void> | Shut down engine, queue, and worker. |
Execution State
Section titled “Execution State”const exec = engine.getExecution(run.id);
exec.id; // 'wf_abc123' — unique execution IDexec.workflowName; // 'order-pipeline'exec.state; // 'running' | 'completed' | 'failed' | 'waiting' | 'compensating'exec.input; // { orderId: 'ORD-1', amount: 99.99 }exec.steps; // Step-by-step status and results:// {// 'validate': { status: 'completed', result: { orderId: 'ORD-1', validated: true } },// 'charge': { status: 'completed', result: { transactionId: 'tx_abc' } },// 'confirm': { status: 'running' }// }exec.signals; // { 'manager-approval': { approved: true } }exec.createdAt; // 1712700000000exec.updatedAt; // 1712700005000Execution states:
| State | Meaning |
|---|---|
running | Steps are being executed |
completed | All steps finished successfully |
failed | A step threw an error (compensation has run) |
waiting | Paused at a waitFor, waiting for a signal |
compensating | Compensation handlers are running |
Real-World Examples
Section titled “Real-World Examples”E-commerce Order Pipeline
Section titled “E-commerce Order Pipeline”A complete order flow with inventory reservation, payment processing, and automatic rollback on failure:
const orderFlow = new Workflow<{ orderId: string; items: Item[]; amount: number }>('process-order') .step('validate-order', async (ctx) => { const { orderId, items, amount } = ctx.input as OrderInput;
if (items.length === 0) throw new Error('Empty cart'); if (amount <= 0) throw new Error('Invalid amount');
// Check all items are in catalog for (const item of items) { const exists = await catalog.exists(item.sku); if (!exists) throw new Error(`Unknown SKU: ${item.sku}`); }
return { orderId, itemCount: items.length, amount }; }) .step('reserve-inventory', async (ctx) => { const { items } = ctx.input as OrderInput; const reservationId = await inventory.reserveBatch(items); return { reservationId }; }, { retry: 3, // Retry on transient inventory service errors compensate: async (ctx) => { const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string }; await inventory.releaseBatch(reservationId); }, }) .step('process-payment', async (ctx) => { const { amount, orderId } = ctx.steps['validate-order'] as ValidatedOrder; const charge = await stripe.charges.create({ amount: Math.round(amount * 100), currency: 'usd', metadata: { orderId }, }); return { chargeId: charge.id, receiptUrl: charge.receipt_url }; }, { retry: 5, // Payment gateway can be flaky timeout: 15000, // 15s timeout per attempt compensate: async (ctx) => { const { chargeId } = ctx.steps['process-payment'] as { chargeId: string }; await stripe.refunds.create({ charge: chargeId }); }, }) .step('create-shipment', async (ctx) => { const { orderId, items } = ctx.input as OrderInput; const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string }; const shipment = await shipping.create({ orderId, items, reservationId }); return { trackingNumber: shipment.tracking, carrier: shipment.carrier }; }) .parallel((w) => w .step('send-confirmation', async (ctx) => { const payment = ctx.steps['process-payment'] as { chargeId: string; receiptUrl: string }; const shipment = ctx.steps['create-shipment'] as { trackingNumber: string; carrier: string }; const { email } = ctx.input as { email: string }; await mailer.send('order-confirmation', { to: email, receiptUrl: payment.receiptUrl, tracking: shipment.trackingNumber, }); return { emailSent: true }; }) .step('notify-warehouse', async (ctx) => { const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string }; await warehouse.notifyShipment(reservationId); return { warehouseNotified: true }; }) .step('update-analytics', async (ctx) => { const { amount, orderId } = ctx.steps['validate-order'] as ValidatedOrder; await analytics.trackPurchase({ orderId, amount }); return { tracked: true }; }) );What happens on failure:
- If
process-paymentfails after 5 retries →reserve-inventorycompensation runs (items released) - If
create-shipmentfails →process-paymentcompensation runs (refund), thenreserve-inventorycompensation runs (items released) - If any parallel notification step fails → full rollback: refund payment, release inventory
- The
parallel()block sends email, notifies warehouse, and tracks analytics concurrently — much faster than sequential
CI/CD Deployment Pipeline with Approval Gate
Section titled “CI/CD Deployment Pipeline with Approval Gate”Build, test, deploy to staging, wait for manual approval, then deploy to production:
const deployFlow = new Workflow('deploy-pipeline') .step('build', async (ctx) => { const { repo, branch, commitSha } = ctx.input as DeployInput; const build = await ci.triggerBuild({ repo, branch, commitSha }); await ci.waitForBuild(build.id); // Polls until complete return { buildId: build.id, artifact: build.artifactUrl, duration: build.durationMs }; }) .step('run-tests', async (ctx) => { const { buildId } = ctx.steps['build'] as { buildId: string }; const results = await ci.runTestSuite(buildId, { suites: ['unit', 'integration', 'e2e'], parallel: true, });
if (results.failed > 0) { throw new Error(`${results.failed}/${results.total} tests failed`); }
return { passed: results.passed, coverage: results.coverage }; }) .step('deploy-staging', async (ctx) => { const { artifact } = ctx.steps['build'] as { artifact: string }; await k8s.deploy('staging', artifact); const healthCheck = await k8s.waitForHealthy('staging', 60000);
// Notify the team await slack.send('#deploys', { text: `Staging deploy ready for review`, url: `https://staging.example.com`, });
return { env: 'staging', healthy: healthCheck.ok }; }, { compensate: async () => { // Rollback staging to previous version await k8s.rollback('staging'); }, }) .waitFor('production-approval', { timeout: 48 * 60 * 60 * 1000 }) // 48h timeout .step('deploy-production', async (ctx) => { const approval = ctx.signals['production-approval'] as { approver: string; strategy: 'rolling' | 'blue-green' | 'canary'; };
const { artifact } = ctx.steps['build'] as { artifact: string };
// Deploy with the approved strategy await k8s.deploy('production', artifact, { strategy: approval.strategy }); await k8s.waitForHealthy('production', 120000);
await slack.send('#deploys', { text: `Production deploy complete (${approval.strategy})`, approvedBy: approval.approver, });
return { env: 'production', approvedBy: approval.approver, strategy: approval.strategy, }; }, { compensate: async () => { await k8s.rollback('production'); await slack.send('#deploys', { text: 'Production rolled back!' }); }, });
// Usageconst run = await engine.start('deploy-pipeline', { repo: 'myapp', branch: 'release/v2.5', commitSha: 'abc123f',});
// After QA on staging (hours/days later):await engine.signal(run.id, 'production-approval', { approver: 'cto@company.com', strategy: 'canary',});KYC Onboarding with Risk-Based Branching
Section titled “KYC Onboarding with Risk-Based Branching”Different verification paths based on risk scoring — low-risk users get auto-approved, medium-risk need document upload, high-risk go to manual compliance review:
const kycFlow = new Workflow('kyc-onboarding') .step('create-account', async (ctx) => { const { email, name, country } = ctx.input as OnboardingInput; const user = await db.users.create({ email, name, country, status: 'pending' }); return { userId: user.id }; }, { compensate: async (ctx) => { // Delete the account if verification fails const { userId } = ctx.steps['create-account'] as { userId: string }; await db.users.delete(userId); }, }) .step('risk-assessment', async (ctx) => { const { country, email, ip } = ctx.input as OnboardingInput; const score = await riskEngine.assess({ country, email, ip });
return { score, riskLevel: score > 80 ? 'low' : score > 50 ? 'medium' : 'high', }; }) .branch((ctx) => (ctx.steps['risk-assessment'] as { riskLevel: string }).riskLevel) .path('low', (w) => w.step('auto-approve', async () => { return { approved: true, method: 'automatic', verifiedAt: Date.now() }; }) ) .path('medium', (w) => w.step('request-documents', async (ctx) => { const { userId } = ctx.steps['create-account'] as { userId: string }; await mailer.send('document-request', { userId }); return { documentsRequested: true }; }) .waitFor('documents-uploaded') .step('verify-documents', async (ctx) => { const docs = ctx.signals['documents-uploaded'] as { files: string[] }; const verification = await docVerification.check(docs.files);
if (!verification.passed) { throw new Error(`Document verification failed: ${verification.reason}`); }
return { approved: true, method: 'document-review', verifiedAt: Date.now() }; }) ) .path('high', (w) => w.step('flag-compliance', async (ctx) => { const { userId } = ctx.steps['create-account'] as { userId: string }; const { score } = ctx.steps['risk-assessment'] as { score: number }; await complianceQueue.assign({ userId, riskScore: score }); return { flagged: true }; }) .waitFor('compliance-decision') .step('apply-compliance-decision', async (ctx) => { const decision = ctx.signals['compliance-decision'] as { approved: boolean; reviewer: string; notes: string; };
if (!decision.approved) { throw new Error(`Rejected by compliance: ${decision.notes}`); }
return { approved: true, method: 'compliance-review', reviewer: decision.reviewer }; }) ) .step('activate-account', async (ctx) => { const { userId } = ctx.steps['create-account'] as { userId: string }; await db.users.update(userId, { status: 'active', activatedAt: Date.now() }); await mailer.send('welcome', { userId }); return { activated: true }; });Data Pipeline (ETL)
Section titled “Data Pipeline (ETL)”Extract, transform, load with metrics aggregation at each stage:
const etlFlow = new Workflow('daily-etl') .step('extract', async (ctx) => { const { date, sources } = ctx.input as { date: string; sources: string[] }; const records: Record[] = [];
for (const source of sources) { const data = await dataLake.query(source, { date }); records.push(...data); }
return { totalRecords: records.length, sources: sources.length, data: records }; }) .step('transform', async (ctx) => { const { data } = ctx.steps['extract'] as { data: Record[] };
const cleaned = data .filter(r => r.timestamp && r.value !== null) .map(r => ({ ...r, value: normalize(r.value), processedAt: Date.now() }));
const dropped = data.length - cleaned.length;
return { cleanedRecords: cleaned.length, droppedRecords: dropped, data: cleaned }; }) .step('load', async (ctx) => { const { data } = ctx.steps['transform'] as { data: CleanRecord[] }; const extract = ctx.steps['extract'] as { totalRecords: number; sources: number }; const transform = ctx.steps['transform'] as { cleanedRecords: number; droppedRecords: number };
// Batch insert const batches = chunk(data, 1000); for (const batch of batches) { await warehouse.insertBatch('analytics_events', batch); }
return { pipeline: 'daily-etl', date: (ctx.input as { date: string }).date, metrics: { sourcesProcessed: extract.sources, rawRecords: extract.totalRecords, cleanedRecords: transform.cleanedRecords, droppedRecords: transform.droppedRecords, loadedRecords: data.length, }, }; });Putting It Together: ETL with Observability
Section titled “Putting It Together: ETL with Observability”Wire up the ETL pipeline with monitoring and cleanup:
const engine = new Engine({ embedded: true, dataPath: './data/etl.db' });
// Recover any orphaned executions from previous crashesawait engine.recover();
// Observability: track step durations and failuresengine.on('step:started', (e) => { const { stepName } = e as StepEvent; metrics.startTimer(`etl.${stepName}.duration`);});engine.on('step:completed', (e) => { const { stepName } = e as StepEvent; metrics.stopTimer(`etl.${stepName}.duration`); metrics.increment('etl.steps.completed');});engine.on('step:retry', (e) => { const { stepName, attempt, error } = e as StepEvent; logger.warn(`Retrying ${stepName}, attempt ${attempt}: ${error}`);});engine.on('workflow:failed', (e) => { alerting.pagerduty(`ETL pipeline failed: ${e.executionId}`);});
engine.register(etlFlow);
// Run daily via cronawait engine.start('daily-etl', { date: '2026-04-10', sources: ['clickstream', 'transactions'] });
// Cleanup: archive completed runs older than 30 days, delete archived after 90 daysengine.archive(30 * 24 * 60 * 60 * 1000, ['completed']);engine.cleanup(90 * 24 * 60 * 60 * 1000);Batch Processing with forEach and Map
Section titled “Batch Processing with forEach and Map”Process a list of invoices, aggregate results, and send a summary:
import { z } from 'zod';
const InvoiceInput = z.object({ invoiceIds: z.array(z.string()), batchId: z.string(),});
const invoiceFlow = new Workflow<{ invoiceIds: string[]; batchId: string }>('process-invoices') // Validate input with Zod schema .step('init', async (ctx) => { return { count: (ctx.input as { invoiceIds: string[] }).invoiceIds.length }; }, { inputSchema: InvoiceInput })
// Process each invoice .forEach( (ctx) => (ctx.input as { invoiceIds: string[] }).invoiceIds, 'process-invoice', async (ctx) => { const invoiceId = ctx.steps.__item as string; const result = await billingService.process(invoiceId); return { invoiceId, amount: result.amount, status: result.status }; }, { retry: 3, timeout: 15_000 } )
// Aggregate all invoice results .map('summary', (ctx) => { const results: { amount: number; status: string }[] = []; let i = 0; while (ctx.steps[`process-invoice:${i}`]) { results.push(ctx.steps[`process-invoice:${i}`] as { amount: number; status: string }); i++; } const total = results.reduce((sum, r) => sum + r.amount, 0); const failed = results.filter(r => r.status === 'failed').length; return { total, processed: results.length, failed }; })
// Send summary report .step('report', async (ctx) => { const summary = ctx.steps['summary'] as { total: number; processed: number; failed: number }; await notificationService.send({ channel: '#billing', text: `Batch complete: ${summary.processed} invoices, $${summary.total} total, ${summary.failed} failures`, }); return { reported: true }; });Retry Loop with doUntil
Section titled “Retry Loop with doUntil”Poll an external API until a resource is ready:
const deployFlow = new Workflow<{ deployId: string }>('wait-deploy') .step('trigger', async (ctx) => { const id = (ctx.input as { deployId: string }).deployId; await cloudProvider.triggerDeploy(id); return { deployId: id }; }) .doUntil( (ctx) => (ctx.steps['check-status'] as { ready: boolean })?.ready === true, (w) => w.step('check-status', async (ctx) => { const id = (ctx.steps['trigger'] as { deployId: string }).deployId; const status = await cloudProvider.getDeployStatus(id); // Simulate wait between polls await new Promise((r) => setTimeout(r, 5000)); return { ready: status === 'running', status }; }, { retry: 1, timeout: 30_000 }), { maxIterations: 60 } // max 5 minutes of polling ) .step('verify', async (ctx) => { const id = (ctx.steps['trigger'] as { deployId: string }).deployId; const health = await cloudProvider.healthCheck(id); return { healthy: health.ok }; });How It Works Internally
Section titled “How It Works Internally”The workflow engine is a pure consumer layer built on top of bunqueue. Zero modifications to the core engine.
Workflow DSL (.step / .branch / .waitFor) │ ▼┌──────────────────────────────────────────────────────────────────┐│ Engine ││ ││ ┌─────────────────────────────────────────────────────────┐ ││ │ Executor │ ││ │ • Resolves current node (step/branch/parallel/waitFor/ │ ││ │ doUntil/doWhile/forEach/map) │ ││ │ • Runs step handler with timeout + retry (backoff) │ ││ │ • Schema validation (inputSchema/outputSchema) │ ││ │ • Evaluates branch condition, picks path │ ││ │ • Runs parallel steps via Promise.allSettled │ ││ │ • Executes loops (doUntil/doWhile) with maxIterations │ ││ │ • forEach: iterates items with indexed step names │ ││ │ • map: synchronous data transforms │ ││ │ • Checks signal availability + timeout for waitFor │ ││ │ • Dispatches sub-workflows, polls until complete │ ││ │ • Runs compensation in reverse on failure │ ││ └──────────────────┬──────────────────────────────────────┘ ││ │ ││ ┌──────────────────┼──────────────────────────────────────┐ ││ │ │ │ ││ │ ┌────────┐ ┌───▼────┐ ┌───────────────────────┐ │ ││ │ │ Queue │ │ Worker │ │ Store (SQLite) │ │ ││ │ │__wf: │ │ pulls │ │ workflow_executions │ │ ││ │ │steps │──│ & runs │──│ table: id, state, │ │ ││ │ │ │ │ steps │ │ input, steps, signals │ │ ││ │ └────────┘ └────────┘ └───────────────────────┘ │ ││ │ bunqueue internals (Queue + Worker + SQLite) │ ││ └─────────────────────────────────────────────────────────┘ │└──────────────────────────────────────────────────────────────────┘Execution flow:
engine.start()creates anExecutionrecord in SQLite and enqueues the first step as a job on the internal__wf:stepsqueue- Worker picks up the step job. The Executor loads the execution state, resolves the current node
- Step node: runs the handler with retry + timeout, saves result, enqueues next node
- Branch node: evaluates the condition function, runs the matching path’s steps inline
- Parallel node: runs all steps via
Promise.allSettled, saves all results, then advances - WaitFor node: checks if the signal exists. If not, sets state to
'waiting'and schedules a timeout check if configured - SubWorkflow node: starts a child execution, polls until it reaches a terminal state, saves child results under
sub:<name> - DoUntil/DoWhile node: runs loop steps repeatedly, checking condition before (doWhile) or after (doUntil) each iteration
- ForEach node: extracts items array, runs the step for each with indexed names (
step:0,step:1, …) - Map node: runs a synchronous transform, stores result, then advances
engine.signal()stores the signal payload and re-enqueues the current node- On failure: the Executor walks completed steps in reverse, calling each compensate handler
Each workflow step is a regular bunqueue job. You get all of bunqueue’s features for free: SQLite persistence, concurrency control, and monitoring via the dashboard.
Limitations & Caveats
Section titled “Limitations & Caveats”Before using the workflow engine in production, be aware of these trade-offs:
| Limitation | Details |
|---|---|
| Single-instance only | The workflow engine runs in-process. There is no distributed coordination — you cannot run multiple engine instances on the same database. |
| At-most-once execution | engine.recover() re-enqueues orphaned executions, but steps are not idempotent by default. If a step partially commits (e.g., writes to an external API) and the process crashes before saving the result, the step will re-run on recovery. Design external-facing steps to be idempotent. |
| Compensation must be idempotent | The engine doesn’t track which compensations have already run. If the process crashes mid-compensation, engine.recover() re-runs compensation from the beginning. Always design compensate handlers to be idempotent. |
| Recovery requires manual call | engine.recover() must be called explicitly on startup — there is no automatic crash detection. Call it after registering all workflows and before starting new executions. |
| Sub-workflow 300s timeout | Sub-workflows have a hardcoded 5-minute timeout (not configurable). Long-running child workflows will fail the parent. |
| listExecutions returns max 100 | engine.listExecutions() is capped at 100 results with no pagination support. For larger datasets, query the SQLite store directly. |
Next Steps
Section titled “Next Steps”- Simple Mode — All-in-one Queue + Worker for simpler use cases
- Queue API — Low-level queue operations
- Flow Producer — Parent-child job dependencies (simpler than workflows)
- MCP Server — Let AI agents orchestrate workflows via natural language
- Examples — More code recipes