Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

- "You miss 100 percent of the chances you don't take. — Wayne Gretzky" — Michael Scott

- **feat(nestjs): Instrument `@nestjs/bullmq` `@Processor` decorator**

Automatically capture exceptions and create transactions for BullMQ queue processors in NestJS applications.

When using the `@Processor` decorator from `@nestjs/bullmq`, the SDK now automatically wraps the `process()` method
to create `queue.process` transactions with proper isolation scopes, preventing breadcrumb and scope leakage between
jobs and HTTP requests. Errors thrown in processors are captured with the `auto.queue.nestjs.bullmq` mechanism type.

Requires `@nestjs/bullmq` v10.0.0 or later.

- **feat(nestjs): Instrument `@nestjs/schedule` decorators ([#19735](https://github.com/getsentry/sentry-javascript/pull/19735))**

Automatically capture exceptions thrown in `@Cron`, `@Interval`, and `@Timeout` decorated methods.
Expand Down
2 changes: 2 additions & 0 deletions dev-packages/e2e-tests/test-applications/nestjs-bullmq/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
@sentry:registry=http://127.0.0.1:4873
@sentry-internal:registry=http://127.0.0.1:4873
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
services:
redis:
image: redis:latest
restart: always
container_name: e2e-tests-nestjs-bullmq-redis
ports:
- '6379:6379'
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { execSync } from 'child_process';
import { dirname } from 'path';
import { fileURLToPath } from 'url';

const __dirname = dirname(fileURLToPath(import.meta.url));

export default async function globalSetup() {
// Start Redis via Docker Compose
execSync('docker compose up -d --wait', {
cwd: __dirname,
stdio: 'inherit',
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { execSync } from 'child_process';
import { dirname } from 'path';
import { fileURLToPath } from 'url';

const __dirname = dirname(fileURLToPath(import.meta.url));

export default async function globalTeardown() {
// Stop Redis and remove containers
execSync('docker compose down --volumes', {
cwd: __dirname,
stdio: 'inherit',
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"compilerOptions": {
"deleteOutDir": true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"name": "nestjs-bullmq",
"version": "0.0.1",
"private": true,
"scripts": {
"build": "nest build",
"start": "nest start",
"start:dev": "nest start --watch",
"start:prod": "node dist/main",
"clean": "npx rimraf node_modules pnpm-lock.yaml",
"test": "playwright test",
"test:build": "pnpm install && pnpm build",
"test:assert": "pnpm test"
},
"dependencies": {
"@nestjs/common": "^11.0.0",
"@nestjs/core": "^11.0.0",
"@nestjs/platform-express": "^11.0.0",
"@nestjs/bullmq": "^11.0.0",
"bullmq": "^5.0.0",
"@sentry/nestjs": "latest || *",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.1"
},
"devDependencies": {
"@playwright/test": "~1.56.0",
"@sentry-internal/test-utils": "link:../../../test-utils",
"@nestjs/cli": "^11.0.0",
"@nestjs/schematics": "^11.0.0",
"@types/node": "^18.19.1",
"typescript": "~5.0.0"
},
"volta": {
"extends": "../../package.json"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { getPlaywrightConfig } from '@sentry-internal/test-utils';

const config = getPlaywrightConfig({
startCommand: `pnpm start`,
});

export default {
...config,
globalSetup: './global-setup.mjs',
globalTeardown: './global-teardown.mjs',
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Controller, Get, Param } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Controller()
export class AppController {
constructor(@InjectQueue('test-queue') private readonly queue: Queue) {}

@Get('enqueue/:name')
async enqueue(@Param('name') name: string) {
await this.queue.add(name, { timestamp: Date.now() });
return { queued: true };
}

@Get('check-isolation')
checkIsolation() {
// This endpoint is called after the processor adds a breadcrumb.
// The test verifies that breadcrumbs from the processor do NOT leak here.
return { message: 'ok' };
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Module } from '@nestjs/common';
import { APP_FILTER } from '@nestjs/core';
import { BullModule } from '@nestjs/bullmq';
import { SentryGlobalFilter, SentryModule } from '@sentry/nestjs/setup';
import { AppController } from './app.controller';
import { TestProcessor } from './jobs/test.processor';

@Module({
imports: [
SentryModule.forRoot(),
BullModule.forRoot({
connection: { host: 'localhost', port: 6379 },
}),
BullModule.registerQueue({ name: 'test-queue' }),
],
controllers: [AppController],
providers: [
TestProcessor,
{
provide: APP_FILTER,
useClass: SentryGlobalFilter,
},
],
})
export class AppModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import * as Sentry from '@sentry/nestjs';

Sentry.init({
environment: 'qa', // dynamic sampling bias to keep transactions
dsn: process.env.E2E_TEST_DSN,
tunnel: `http://localhost:3031/`, // proxy server
tracesSampleRate: 1,
transportOptions: {
// We expect the app to send a lot of events in a short time
bufferSize: 1000,
},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import * as Sentry from '@sentry/nestjs';

@Processor('test-queue')
export class TestProcessor extends WorkerHost {
async process(job: Job): Promise<any> {
if (job.name === 'fail') {
throw new Error('Test error from BullMQ processor');
}

if (job.name === 'breadcrumb-test') {
Sentry.addBreadcrumb({
message: 'leaked-breadcrumb-from-bullmq-processor',
level: 'info',
});
return { processed: true };
}

return { processed: true };
}

@OnWorkerEvent('completed')
onCompleted(job: Job) {
if (job.name === 'lifecycle-breadcrumb-test') {
Sentry.addBreadcrumb({
message: 'leaked-breadcrumb-from-lifecycle-event',
level: 'info',
});
}
}
}
15 changes: 15 additions & 0 deletions dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Import this first
import './instrument';

// Import other modules
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

const PORT = 3030;

async function bootstrap() {
const app = await NestFactory.create(AppModule);
await app.listen(PORT);
}

bootstrap();
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { startEventProxyServer } from '@sentry-internal/test-utils';

startEventProxyServer({
port: 3031,
proxyServerName: 'nestjs-bullmq',
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { expect, test } from '@playwright/test';
import { waitForError, waitForTransaction } from '@sentry-internal/test-utils';

test('Sends exception to Sentry on error in @Processor process method', async ({ baseURL }) => {
const errorEventPromise = waitForError('nestjs-bullmq', event => {
return (
!event.type &&
event.exception?.values?.[0]?.value === 'Test error from BullMQ processor' &&
event.exception?.values?.[0]?.mechanism?.type === 'auto.queue.nestjs.bullmq'
);
});

// Enqueue a job that will fail
await fetch(`${baseURL}/enqueue/fail`);

const errorEvent = await errorEventPromise;

expect(errorEvent.exception?.values).toHaveLength(1);
expect(errorEvent.exception?.values?.[0]?.mechanism).toEqual({
handled: false,
type: 'auto.queue.nestjs.bullmq',
});
});

test('Creates a transaction for successful job processing', async ({ baseURL }) => {
const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => {
return transactionEvent.contexts?.trace?.op === 'queue.process';
});

// Enqueue a job that will succeed
await fetch(`${baseURL}/enqueue/success`);

const transaction = await transactionPromise;

expect(transaction.transaction).toBe('test-queue process');
expect(transaction.contexts?.trace?.op).toBe('queue.process');
expect(transaction.contexts?.trace?.origin).toBe('auto.queue.nestjs.bullmq');
});

test('BullMQ processor breadcrumbs do not leak into subsequent HTTP requests', async ({ baseURL }) => {
const processTransactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => {
return transactionEvent.contexts?.trace?.op === 'queue.process';
});

// Enqueue a job that adds a breadcrumb during processing
await fetch(`${baseURL}/enqueue/breadcrumb-test`);

await processTransactionPromise;

const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => {
return transactionEvent.transaction === 'GET /check-isolation';
});

await fetch(`${baseURL}/check-isolation`);

const transaction = await transactionPromise;

const leakedBreadcrumb = (transaction.breadcrumbs || []).find(
(b: any) => b.message === 'leaked-breadcrumb-from-bullmq-processor',
);
expect(leakedBreadcrumb).toBeUndefined();
});

// TODO: @OnWorkerEvent handlers run outside the isolation scope created by process().
// They are registered via worker.on() (EventEmitter), so breadcrumbs/tags set there
// leak into the default isolation scope and appear on subsequent HTTP requests.
// This should be fixed in a follow-up by also wrapping lifecycle event handlers.
test('BullMQ @OnWorkerEvent lifecycle breadcrumbs currently leak into subsequent HTTP requests', async ({
baseURL,
}) => {
const processTransactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => {
return transactionEvent.contexts?.trace?.op === 'queue.process';
});

// Enqueue a job (the completed event fires right after the job is processed)
await fetch(`${baseURL}/enqueue/lifecycle-breadcrumb-test`);

await processTransactionPromise;

const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => {
return transactionEvent.transaction === 'GET /check-isolation';
});

await fetch(`${baseURL}/check-isolation`);

const transaction = await transactionPromise;

const leakedBreadcrumb = (transaction.breadcrumbs || []).find(
(b: any) => b.message === 'leaked-breadcrumb-from-lifecycle-event',
);
// This SHOULD be toBeUndefined() once lifecycle event isolation is implemented.
expect(leakedBreadcrumb).toBeDefined();
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "./tsconfig.json",
"exclude": ["node_modules", "test", "dist"]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"compilerOptions": {
"module": "commonjs",
"declaration": true,
"removeComments": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"allowSyntheticDefaultImports": true,
"target": "ES2021",
"sourceMap": true,
"outDir": "./dist",
"baseUrl": "./",
"incremental": true,
"skipLibCheck": true,
"strictNullChecks": false,
"noImplicitAny": false,
"strictBindCallApply": false,
"forceConsistentCasingInFileNames": false,
"noFallthroughCasesInSwitch": false,
"moduleResolution": "Node16"
}
}
20 changes: 20 additions & 0 deletions packages/nestjs/src/integrations/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ export function getEventSpanOptions(event: string): {
};
}

/**
* Returns span options for nest bullmq process spans.
*/
export function getBullMQProcessSpanOptions(queueName: string): {
name: string;
attributes: Record<string, string>;
forceTransaction: boolean;
} {
return {
name: `${queueName} process`,
attributes: {
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process',
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.queue.nestjs.bullmq',
'messaging.system': 'bullmq',
'messaging.destination.name': queueName,
},
forceTransaction: true,
};
}

/**
* Adds instrumentation to a js observable and attaches the span to an active parent span.
*/
Expand Down
Loading
Loading