Skip to content

Queue API — Add Jobs, Priorities, Delays & Deduplication in bunqueue

The Queue class is used to add and manage jobs.

import { Queue } from 'bunqueue/client';
// Basic queue - embedded mode
const queue = new Queue('my-queue', { embedded: true });
// Typed queue
interface TaskData {
userId: number;
action: string;
}
const typedQueue = new Queue<TaskData>('tasks', { embedded: true });
// With default job options
const queue = new Queue('emails', {
embedded: true,
defaultJobOptions: {
attempts: 3,
backoff: 1000,
removeOnComplete: true,
}
});
// Connect to bunqueue server (no embedded option)
const queue = new Queue('tasks');
// With custom connection
const queue = new Queue('tasks', {
connection: {
host: '192.168.1.100',
port: 6789,
token: 'secret-token',
poolSize: 4, // Connection pool size
}
});

prefixKey lets multiple environments, tenants, or services share the same broker without their jobs, workers, cron schedulers, stats, pause state, DLQ, or rate limits overlapping. The prefix is prepended to the queue name on the server side; Queue.name keeps reporting the logical name.

// Same broker, totally isolated namespaces
const devQueue = new Queue('emails', { prefixKey: 'dev:' });
const prodQueue = new Queue('emails', { prefixKey: 'prod:' });
await devQueue.add('send', { to: 'tester@example.com' });
await prodQueue.add('send', { to: 'user@example.com' });
await devQueue.getJobCountsAsync(); // { waiting: 1, ... }
await prodQueue.getJobCountsAsync(); // { waiting: 1, ... } — never sees dev jobs

A Worker must use the same prefixKey to consume jobs from the prefixed queue:

const devWorker = new Worker('emails', processor, {
prefixKey: 'dev:',
connection: { port: 6789 },
});
// devWorker only processes jobs added via devQueue, never prodQueue.

What gets isolated:

  • Jobs (push / pull / ack / fail / DLQ)
  • Worker registration and lock-based ownership
  • Counts, stats, and isPaused
  • pause() / resume() / drain() / obliterate()
  • Rate limits and global concurrency
  • Cron schedulers — two prefixes can use the same schedulerId without colliding on the global cron PRIMARY KEY (this fixes #77)

Use cases:

  • Multi-environmentdev: / staging: / prod: on a single broker, no port juggling
  • Multi-tenant SaaStenant-${tenantId}: per customer, queue names can be reused
  • Monorepo — each service prefixes its queues so two process queues from service-a and service-b don’t collide
  • Test isolationtest-${runId}: to avoid obliterate() between parallel test runs

Notes:

  • Backward compatible — without prefixKey behavior is identical to previous releases.
  • Queue.name always returns the logical name. The server-side key (prefixKey + name) is internal.
  • Job.queueName returned to processors will reflect the prefixed key (e.g. dev:emails). This is the only user-visible side effect.
  • Works in both embedded and TCP modes.
const job = await queue.add('job-name', { key: 'value' });
// With options
const job = await queue.add('job-name', data, {
priority: 10, // Higher = processed first
delay: 5000, // Delay in ms before processing
attempts: 5, // Max retry attempts (default: 3)
backoff: 2000, // Backoff between retries (default: 1000ms, jitter applied)
backoffConfig: { // Advanced backoff configuration
type: 'exponential', // 'fixed' or 'exponential'
delay: 2000, // Base delay in ms
},
timeout: 30000, // Job timeout in ms
jobId: 'custom-id', // Custom job ID for deduplication (BullMQ-style)
removeOnComplete: true, // Remove job data after completion
removeOnFail: false, // Keep failed jobs
stallTimeout: 60000, // Per-job stall timeout (overrides queue config)
});
// Batch optimized - single lock, batch INSERT
const jobs = await queue.addBulk([
{ name: 'task-1', data: { id: 1 } },
{ name: 'task-2', data: { id: 2 }, opts: { priority: 10 } },
{ name: 'task-3', data: { id: 3 }, opts: { delay: 5000 } },
]);
// Repeat every 5 seconds
await queue.add('heartbeat', {}, {
repeat: {
every: 5000,
}
});
// Repeat with limit
await queue.add('daily-report', {}, {
repeat: {
every: 86400000, // 24 hours
limit: 30, // Max 30 repetitions
}
});
// Cron pattern (server mode)
await queue.add('weekly', {}, {
repeat: {
pattern: '0 9 * * MON', // Every Monday at 9am
}
});

You can update the data for the next repeat execution using updateData(). This works even after the current execution completes — the update propagates to the successor job automatically.

const job = await queue.add('sync', { endpoint: '/api/v1' }, {
repeat: { every: 60000 },
});
// Update data for the next execution
await job.updateData({ endpoint: '/api/v2' });
// Next repeat will use { endpoint: '/api/v2' }

By default, bunqueue uses a write buffer for high throughput: jobs are batched in memory and flushed to SQLite every 10ms. This achieves ~100k jobs/sec but means jobs could be lost if the process crashes before the buffer is flushed.

For critical jobs where data loss is unacceptable, use the durable option:

// Critical job: immediate disk write, guaranteed persistence
await queue.add('process-payment', { orderId: '123', amount: 99.99 }, {
durable: true,
});
// Batch of critical jobs
await queue.addBulk([
{ name: 'payment-1', data: { orderId: '1' }, opts: { durable: true } },
{ name: 'payment-2', data: { orderId: '2' }, opts: { durable: true } },
]);

Use jobId to prevent duplicate jobs. When a job with the same jobId already exists, the existing job is returned instead of creating a duplicate. This works in both embedded and TCP modes (including auto-batched operations). This is BullMQ-compatible idempotent behavior.

// Basic deduplication with jobId (BullMQ-style idempotency)
// If job with same jobId exists, returns existing job instead of creating duplicate
const job = await queue.add('send-email', { to: 'user@test.com' }, {
jobId: 'email-user-123'
});
// First call: creates the job
const job1 = await queue.add('process', { orderId: 123 }, {
jobId: 'order-123'
});
// Second call with same jobId: returns existing job (no duplicate)
const job2 = await queue.add('process', { orderId: 123 }, {
jobId: 'order-123'
});
console.log(job1.id === job2.id); // true - same job returned
// Example: Restore jobs on service startup
async function restoreJobs(jobsToRestore: SavedJob[]) {
for (const saved of jobsToRestore) {
// Safe: existing jobs are returned, not duplicated
await queue.add('process', saved.data, {
jobId: saved.id
});
}
}

For more control over deduplication behavior, use the deduplication option with TTL-based unique keys and strategies.

While jobId provides permanent idempotency (via customId), the deduplication option uses a separate uniqueKey mechanism with TTL-based expiry. The id field is required:

// TTL-based deduplication - unique key expires after 1 hour
await queue.add('notification', { userId: '123' }, {
deduplication: {
id: 'notify-123', // Required: unique deduplication key
ttl: 3600000 // 1 hour in ms
}
});
// After TTL expires, the same id can create a new job
// This is useful for rate-limiting or time-windowed deduplication

The extend strategy resets the TTL of an existing job when a duplicate is detected. The existing job is returned (not replaced), but its deduplication window is extended:

// Extend strategy - reset TTL if duplicate, return existing job
await queue.add('rate-limited-task', { action: 'sync' }, {
deduplication: {
id: 'sync-task', // Required: unique deduplication key
ttl: 60000,
extend: true // Extend TTL on duplicate
}
});

The replace strategy removes the existing job and inserts a new one with the updated data. This is useful when you always want the latest data to be processed:

// Replace strategy - remove old job, insert new one
await queue.add('latest-data', { data: newData }, {
deduplication: {
id: 'data-job', // Required: unique deduplication key
ttl: 300000,
replace: true // Replace existing job with new data
}
});
OptionTypeDefaultDescription
ttlnumber-Time in ms before unique key expires
extendbooleanfalseReset TTL on duplicate (returns existing job)
replacebooleanfalseRemove old job and create new one
// Get job by ID
const job = await queue.getJob('job-id');
// Get job state
const state = await queue.getJobState('job-id');
// 'waiting' | 'active' | 'completed' | 'failed' | 'delayed'
// Get job counts (sync - embedded mode only)
const counts = queue.getJobCounts();
// { waiting: 10, active: 2, completed: 100, failed: 3 }
// Get job counts (async - works with TCP)
const counts = await queue.getJobCountsAsync();
// Get jobs with filtering (sync - embedded mode only)
const jobs = queue.getJobs({ state: 'waiting', start: 0, end: 10 });
// Get jobs with filtering (async - works with TCP)
const jobs = await queue.getJobsAsync({ state: 'failed', start: 0, end: 50 });
// Get counts grouped by priority
const byPriority = queue.getCountsPerPriority();
// { 0: 50, 10: 20, 100: 5 }
// Async version
const byPriority = await queue.getCountsPerPriorityAsync();
// Sync (embedded mode only)
const waiting = queue.getWaiting(0, 10);
const active = queue.getActive(0, 10);
const completed = queue.getCompleted(0, 10);
const failed = queue.getFailed(0, 10);
const delayed = queue.getDelayed(0, 10);
// Async (works with TCP)
const waiting = await queue.getWaitingAsync(0, 10);
const active = await queue.getActiveAsync(0, 10);
const completed = await queue.getCompletedAsync(0, 10);
const failed = await queue.getFailedAsync(0, 10);
const delayed = await queue.getDelayedAsync(0, 10);
// Sync (embedded mode only)
const waitingCount = queue.getWaitingCount();
const activeCount = queue.getActiveCount();
const completedCount = queue.getCompletedCount();
const failedCount = queue.getFailedCount();
const delayedCount = queue.getDelayedCount();
const total = queue.count();
// Async (works with TCP)
const total = await queue.countAsync();
// Check if paused
const paused = queue.isPaused(); // sync
const paused = await queue.isPausedAsync(); // async
// Get prioritized jobs (priority > 0, separate state from 'waiting')
const prioritized = await queue.getPrioritized(0, 10);
const count = await queue.getPrioritizedCount();
// Get jobs waiting for children to complete (flow dependencies)
const waitingChildren = await queue.getWaitingChildren(0, 10);
const count = await queue.getWaitingChildrenCount();
// Pause processing (workers stop pulling)
queue.pause();
// Resume processing
queue.resume();
// Remove all waiting jobs
queue.drain();
// Remove all queue data
queue.obliterate();
// Remove a specific job
queue.remove('job-id');
// Wait until queue/server is ready
await queue.waitUntilReady();
// Close TCP connection (when done)
queue.close();
// Async disconnect (compatibility)
await queue.disconnect();
// Remove completed jobs older than 1 hour (sync)
queue.clean(3600000, 100, 'completed');
// Async version (works with TCP)
const removed = await queue.cleanAsync(3600000, 100, 'completed');
// Promote delayed jobs to waiting
queue.promoteJobs({ count: 50 });
// Bulk retry failed or completed jobs
const retried = await queue.retryJobs({ state: 'failed', count: 100 });
// Update job progress
await queue.updateJobProgress('job-id', 75);
// Get job logs
const logs = queue.getJobLogs('job-id', 0, 100);
// Add log entry to a job
await queue.addJobLog('job-id', 'Processing step 3 completed');
// Get child job results
const childValues = await queue.getChildrenValues('parent-job-id');
// Get job dependencies info
const deps = await queue.getJobDependencies('job-id');
const depCounts = await queue.getJobDependenciesCount('job-id');
// Get child jobs with filter
const processed = await queue.getDependencies('parent-id', 'processed', 0, 10);
const unprocessed = await queue.getDependencies('parent-id', 'unprocessed', 0, 10);
// Wait for a job to finish
const result = await queue.waitJobUntilFinished('job-id', queueEvents, 30000);
// Move job to completed with return value
await queue.moveJobToCompleted('job-id', { success: true }, token);
// Move job to failed with error
await queue.moveJobToFailed('job-id', new Error('reason'), token);
// Move job back to waiting
await queue.moveJobToWait('job-id', token);
// Move job to delayed with specific timestamp
await queue.moveJobToDelayed('job-id', Date.now() + 60000, token);
// Move job to waiting-for-children state
await queue.moveJobToWaitingChildren('job-id', token);
// Set global concurrency limit (max parallel jobs across all workers)
queue.setGlobalConcurrency(10);
const concurrency = await queue.getGlobalConcurrency();
queue.removeGlobalConcurrency();
// Set global rate limit (max jobs per time window)
queue.setGlobalRateLimit(100, 1000); // 100 jobs per second
const rateLimit = await queue.getGlobalRateLimit();
queue.removeGlobalRateLimit();
// Throttle queue for specified duration
await queue.rateLimit(5000); // pause for 5 seconds
// Check remaining throttle time
const ttl = await queue.getRateLimitTtl();
// Check if queue hit rate/concurrency limit
const maxed = await queue.isMaxed();
// Create or update a job scheduler
await queue.upsertJobScheduler('daily-report', {
pattern: '0 9 * * *', // cron pattern
// or: every: 3600000, // interval in ms
}, {
name: 'generate-report',
data: { type: 'daily' },
});
// Get a scheduler
const scheduler = await queue.getJobScheduler('daily-report');
// List all schedulers
const schedulers = await queue.getJobSchedulers(0, 100);
const count = await queue.getJobSchedulersCount();
// Remove a scheduler
await queue.removeJobScheduler('daily-report');
// Look up job ID by deduplication key
const jobId = await queue.getDeduplicationJobId('my-unique-key');
// Remove deduplication key (allows re-adding same jobId)
await queue.removeDeduplicationKey('my-unique-key');
// List active workers
const workers = await queue.getWorkers();
const count = await queue.getWorkersCount();
// Get historical metrics
const completedMetrics = await queue.getMetrics('completed', 0, 100);
const failedMetrics = await queue.getMetrics('failed', 0, 100);
// Trim event log
await queue.trimEvents(1000);

Configure stall detection to recover unresponsive jobs.

queue.setStallConfig({
enabled: true,
stallInterval: 30000, // 30 seconds without heartbeat = stalled
maxStalls: 3, // Move to DLQ after 3 stalls
gracePeriod: 5000, // 5 second grace period after job starts
});
// Get current config
const config = queue.getStallConfig();

See Stall Detection for more details.

// Configure DLQ
queue.setDlqConfig({
autoRetry: true,
autoRetryInterval: 3600000, // 1 hour
maxAutoRetries: 3,
maxAge: 604800000, // 7 days
maxEntries: 10000,
});
// Get current DLQ config
const dlqConfig = queue.getDlqConfig();
// Get DLQ entries
const entries = queue.getDlq();
// Filter entries
const stalledJobs = queue.getDlq({ reason: 'stalled' });
const recentFails = queue.getDlq({ newerThan: Date.now() - 3600000 });
// Get stats
const stats = queue.getDlqStats();
// { total, byReason, pendingRetry, expired, oldestEntry, newestEntry }
// Retry from DLQ
queue.retryDlq(); // Retry all
queue.retryDlq('job-123'); // Retry specific
// Retry by filter
queue.retryDlqByFilter({ reason: 'timeout', limit: 10 });
// Purge DLQ
queue.purgeDlq();

See Dead Letter Queue for more details.

The retryCompleted() method allows re-queuing completed jobs for reprocessing. This is useful when you need to re-run a job that completed successfully, for example when business logic changes or you need to regenerate outputs.

// Retry a specific completed job
const success = queue.retryCompleted('job-id-123');
if (success) {
console.log('Job re-queued for processing');
}
// Retry all completed jobs (use with caution!)
const count = queue.retryCompleted();
console.log(`Re-queued ${count} completed jobs`);
// Async version for TCP mode
const count = await queue.retryCompletedAsync();

In TCP mode, queue.add() calls are automatically batched into PUSHB (bulk push) commands for higher throughput. This is enabled by default and requires no code changes.

How it works: If no flush is in-flight, the add is sent immediately (zero overhead for sequential await). If a flush is already in-flight, subsequent adds are buffered and sent together when the current flush completes or after maxDelayMs, whichever comes first.

// Auto-batching is enabled by default in TCP mode
const queue = new Queue('tasks');
// Sequential: no penalty, each add() sends immediately
for (const item of items) {
await queue.add('task', item);
}
// Concurrent: adds batch into a single PUSHB round-trip
await Promise.all([
queue.add('a', { x: 1 }),
queue.add('b', { x: 2 }),
queue.add('c', { x: 3 }),
]);
const queue = new Queue('tasks', {
autoBatch: {
maxSize: 100, // Flush when buffer reaches this size (default: 50)
maxDelayMs: 10, // Max ms to wait before flushing (default: 5)
},
});
const queue = new Queue('tasks', {
autoBatch: { enabled: false },
});
OptionTypeDefaultDescription
enabledbooleantrueEnable or disable auto-batching
maxSizenumber50Max items before auto-flush
maxDelayMsnumber5Max delay in ms before auto-flush
OptionTypeDefaultDescription
prioritynumber0Higher = processed first
delaynumber0Delay in ms before processing
attemptsnumber3Max retry attempts
backoffnumber1000Backoff base in ms (exponential, jitter applied)
backoffConfigobject-Advanced backoff: { type, delay }
timeoutnumber-Processing timeout in ms
jobIdstring-Custom ID for deduplication (BullMQ-style idempotent)
deduplicationobject-Advanced deduplication config (ttl, extend, replace)
removeOnCompletebooleanfalseAuto-delete after completion
removeOnFailbooleanfalseAuto-delete after failure
stallTimeoutnumber-Per-job stall timeout override
repeatobject-Repeating job config
durablebooleanfalseImmediate disk write (bypass buffer)
// Close TCP connection (no-op in embedded mode)
queue.close();