Integrating bunqueue with Hono and Elysia
bunqueue is designed for the Bun ecosystem, and Hono and Elysia are the most popular web frameworks for Bun. Here’s how to integrate them cleanly.
Hono Integration
Basic Setup
import { Hono } from 'hono';import { Queue, Worker } from 'bunqueue/client';
// Create queue and workerconst emailQueue = new Queue<{ to: string; subject: string }>('emails', { embedded: true,});
const worker = new Worker('emails', async (job) => { await sendEmail(job.data.to, job.data.subject); return { sent: true };}, { embedded: true, concurrency: 5 });
// Create Hono appconst app = new Hono();
app.post('/api/send-email', async (c) => { const { to, subject } = await c.req.json();
const job = await emailQueue.add('send', { to, subject });
return c.json({ jobId: job.id, status: 'queued' });});
app.get('/api/job/:id', async (c) => { const job = await emailQueue.getJob(c.req.param('id'));
if (!job) return c.json({ error: 'Not found' }, 404);
return c.json({ id: job.id, state: await job.getState(), progress: job.progress, data: job.data, });});
export default app;Middleware Pattern
Create a reusable middleware that injects the queue into the context:
import { createMiddleware } from 'hono/factory';import { Queue } from 'bunqueue/client';
// Queue factory middlewareexport function withQueue<T>(name: string) { const queue = new Queue<T>(name, { embedded: true });
return createMiddleware(async (c, next) => { c.set('queue', queue); await next(); });}
// Usageconst app = new Hono();
app.use('/api/orders/*', withQueue<OrderData>('orders'));
app.post('/api/orders', async (c) => { const queue = c.get('queue') as Queue<OrderData>; const order = await c.req.json();
const job = await queue.add('process', order, { priority: order.express ? 10 : 1, attempts: 3, });
return c.json({ orderId: job.id });});Elysia Integration
Basic Setup
import { Elysia } from 'elysia';import { Queue, Worker } from 'bunqueue/client';
const taskQueue = new Queue<{ type: string; payload: unknown }>('tasks', { embedded: true,});
const worker = new Worker('tasks', async (job) => { switch (job.data.type) { case 'resize-image': return await resizeImage(job.data.payload); case 'generate-pdf': return await generatePdf(job.data.payload); default: throw new Error(`Unknown task type: ${job.data.type}`); }}, { embedded: true, concurrency: 3 });
const app = new Elysia() .post('/tasks', async ({ body }) => { const job = await taskQueue.add(body.type, body); return { jobId: job.id, status: 'queued' }; }) .get('/tasks/:id', async ({ params }) => { const job = await taskQueue.getJob(params.id); if (!job) return { error: 'Not found' }; return { id: job.id, state: await job.getState(), progress: job.progress, }; }) .get('/tasks/counts', async () => { return await taskQueue.getJobCountsAsync(); }) .listen(3000);Plugin Pattern
Encapsulate the queue as an Elysia plugin:
import { Elysia } from 'elysia';import { Queue, Worker } from 'bunqueue/client';
export function queuePlugin<T>(name: string) { const queue = new Queue<T>(name, { embedded: true });
return new Elysia({ name: `queue-${name}` }) .decorate('queue', queue) .get(`/queues/${name}/counts`, async ({ queue }) => { return await queue.getJobCountsAsync(); }) .get(`/queues/${name}/health`, async ({ queue }) => { const counts = await queue.getJobCountsAsync(); return { name, ...counts, healthy: counts.active < 1000, }; });}
// Usageconst app = new Elysia() .use(queuePlugin<EmailData>('emails')) .use(queuePlugin<OrderData>('orders')) .listen(3000);Graceful Shutdown
Both frameworks need proper shutdown handling to ensure jobs are completed:
const queue = new Queue('tasks', { embedded: true });const worker = new Worker('tasks', processor, { embedded: true, concurrency: 5,});
// Handle shutdown signalsasync function gracefulShutdown() { console.log('Shutting down...');
// 1. Stop accepting new HTTP requests server.stop();
// 2. Close worker (finishes active jobs) await worker.close();
// 3. Disconnect queue (flushes buffers) await queue.disconnect();
process.exit(0);}
process.on('SIGTERM', gracefulShutdown);process.on('SIGINT', gracefulShutdown);TCP Mode for Microservices
If your web app and workers run as separate processes, use TCP mode:
// Web server (process 1)const queue = new Queue('tasks', { connection: { host: 'bunqueue-server', port: 6789 },});
// Worker (process 2)const worker = new Worker('tasks', processor, { connection: { host: 'bunqueue-server', port: 6789 }, concurrency: 10,});This separates concerns: the web server only adds jobs, and dedicated worker processes handle processing. bunqueue’s connection pooling and auto-batching make this efficient even under high load.
Queue Groups for Multi-Queue Apps
Organize related queues with QueueGroup:
import { QueueGroup } from 'bunqueue/client';
const billing = new QueueGroup('billing');
const invoices = billing.getQueue<InvoiceData>('invoices');const payments = billing.getQueue<PaymentData>('payments');const refunds = billing.getQueue<RefundData>('refunds');
// Queue names are prefixed: "billing:invoices", "billing:payments", etc.
// Pause all billing queues at once (e.g., during maintenance)billing.pauseAll();billing.resumeAll();