diff --git a/CHANGELOG.md b/CHANGELOG.md index cf916274650e..4b5452514ffa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,16 @@ - "You miss 100 percent of the chances you don't take. — Wayne Gretzky" — Michael Scott +- **feat(nestjs): Instrument `@nestjs/bullmq` `@Processor` decorator** + + Automatically capture exceptions and create transactions for BullMQ queue processors in NestJS applications. + + When using the `@Processor` decorator from `@nestjs/bullmq`, the SDK now automatically wraps the `process()` method + to create `queue.process` transactions with proper isolation scopes, preventing breadcrumb and scope leakage between + jobs and HTTP requests. Errors thrown in processors are captured with the `auto.queue.nestjs.bullmq` mechanism type. + + Requires `@nestjs/bullmq` v10.0.0 or later. + - **feat(node): Expose `headersToSpanAttributes` option on `nativeNodeFetchIntegration()` ([#19770](https://github.com/getsentry/sentry-javascript/pull/19770))** Response headers like `http.response.header.content-length` were previously captured automatically on outgoing diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/.npmrc b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/.npmrc new file mode 100644 index 000000000000..070f80f05092 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/.npmrc @@ -0,0 +1,2 @@ +@sentry:registry=http://127.0.0.1:4873 +@sentry-internal:registry=http://127.0.0.1:4873 diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/docker-compose.yml b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/docker-compose.yml new file mode 100644 index 000000000000..53518dbe5195 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/docker-compose.yml @@ -0,0 +1,7 @@ +services: + redis: + image: redis:8 + restart: always + container_name: e2e-tests-nestjs-bullmq-redis + ports: + - '6379:6379' diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/global-setup.mjs b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/global-setup.mjs new file mode 100644 index 000000000000..438b88b61794 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/global-setup.mjs @@ -0,0 +1,13 @@ +import { execSync } from 'child_process'; +import { dirname } from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +export default async function globalSetup() { + // Start Redis via Docker Compose + execSync('docker compose up -d --wait', { + cwd: __dirname, + stdio: 'inherit', + }); +} diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/global-teardown.mjs b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/global-teardown.mjs new file mode 100644 index 000000000000..35ce41179193 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/global-teardown.mjs @@ -0,0 +1,13 @@ +import { execSync } from 'child_process'; +import { dirname } from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +export default async function globalTeardown() { + // Stop Redis and remove containers + execSync('docker compose down --volumes', { + cwd: __dirname, + stdio: 'inherit', + }); +} diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/nest-cli.json b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/nest-cli.json new file mode 100644 index 000000000000..f9aa683b1ad5 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/nest-cli.json @@ -0,0 +1,8 @@ +{ + "$schema": "https://json.schemastore.org/nest-cli", + "collection": "@nestjs/schematics", + "sourceRoot": "src", + "compilerOptions": { + "deleteOutDir": true + } +} diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/package.json b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/package.json new file mode 100644 index 000000000000..77d8c024e021 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/package.json @@ -0,0 +1,36 @@ +{ + "name": "nestjs-bullmq", + "version": "0.0.1", + "private": true, + "scripts": { + "build": "nest build", + "start": "nest start", + "start:dev": "nest start --watch", + "start:prod": "node dist/main", + "clean": "npx rimraf node_modules pnpm-lock.yaml", + "test": "playwright test", + "test:build": "pnpm install && pnpm build", + "test:assert": "pnpm test" + }, + "dependencies": { + "@nestjs/common": "^11.0.0", + "@nestjs/core": "^11.0.0", + "@nestjs/platform-express": "^11.0.0", + "@nestjs/bullmq": "^11.0.0", + "bullmq": "^5.0.0", + "@sentry/nestjs": "latest || *", + "reflect-metadata": "^0.2.0", + "rxjs": "^7.8.1" + }, + "devDependencies": { + "@playwright/test": "~1.56.0", + "@sentry-internal/test-utils": "link:../../../test-utils", + "@nestjs/cli": "^11.0.0", + "@nestjs/schematics": "^11.0.0", + "@types/node": "^18.19.1", + "typescript": "~5.0.0" + }, + "volta": { + "extends": "../../package.json" + } +} diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/playwright.config.mjs b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/playwright.config.mjs new file mode 100644 index 000000000000..d5fd0b394f15 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/playwright.config.mjs @@ -0,0 +1,11 @@ +import { getPlaywrightConfig } from '@sentry-internal/test-utils'; + +const config = getPlaywrightConfig({ + startCommand: `pnpm start`, +}); + +export default { + ...config, + globalSetup: './global-setup.mjs', + globalTeardown: './global-teardown.mjs', +}; diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/app.controller.ts b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/app.controller.ts new file mode 100644 index 000000000000..e8c865e17bcc --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/app.controller.ts @@ -0,0 +1,21 @@ +import { Controller, Get, Param } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; + +@Controller() +export class AppController { + constructor(@InjectQueue('test-queue') private readonly queue: Queue) {} + + @Get('enqueue/:name') + async enqueue(@Param('name') name: string) { + await this.queue.add(name, { timestamp: Date.now() }); + return { queued: true }; + } + + @Get('check-isolation') + checkIsolation() { + // This endpoint is called after the processor adds a breadcrumb. + // The test verifies that breadcrumbs from the processor do NOT leak here. + return { message: 'ok' }; + } +} diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/app.module.ts b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/app.module.ts new file mode 100644 index 000000000000..be5fd107e4cb --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/app.module.ts @@ -0,0 +1,25 @@ +import { Module } from '@nestjs/common'; +import { APP_FILTER } from '@nestjs/core'; +import { BullModule } from '@nestjs/bullmq'; +import { SentryGlobalFilter, SentryModule } from '@sentry/nestjs/setup'; +import { AppController } from './app.controller'; +import { TestProcessor } from './jobs/test.processor'; + +@Module({ + imports: [ + SentryModule.forRoot(), + BullModule.forRoot({ + connection: { host: 'localhost', port: 6379 }, + }), + BullModule.registerQueue({ name: 'test-queue' }), + ], + controllers: [AppController], + providers: [ + TestProcessor, + { + provide: APP_FILTER, + useClass: SentryGlobalFilter, + }, + ], +}) +export class AppModule {} diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/instrument.ts b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/instrument.ts new file mode 100644 index 000000000000..4f16ebb36d11 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/instrument.ts @@ -0,0 +1,12 @@ +import * as Sentry from '@sentry/nestjs'; + +Sentry.init({ + environment: 'qa', // dynamic sampling bias to keep transactions + dsn: process.env.E2E_TEST_DSN, + tunnel: `http://localhost:3031/`, // proxy server + tracesSampleRate: 1, + transportOptions: { + // We expect the app to send a lot of events in a short time + bufferSize: 1000, + }, +}); diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/jobs/test.processor.ts b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/jobs/test.processor.ts new file mode 100644 index 000000000000..1c6cf8ef3052 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/jobs/test.processor.ts @@ -0,0 +1,71 @@ +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { Job } from 'bullmq'; +import * as Sentry from '@sentry/nestjs'; + +@Processor('test-queue') +export class TestProcessor extends WorkerHost { + async process(job: Job): Promise { + if (job.name === 'fail') { + throw new Error('Test error from BullMQ processor'); + } + + if (job.name === 'breadcrumb-test') { + Sentry.addBreadcrumb({ + message: 'leaked-breadcrumb-from-bullmq-processor', + level: 'info', + }); + return { processed: true }; + } + + if (job.name === 'lifecycle-failed-breadcrumb-test') { + throw new Error('Intentional error to trigger failed event'); + } + + if (job.name === 'lifecycle-progress-breadcrumb-test') { + await job.updateProgress(50); + return { processed: true }; + } + + return { processed: true }; + } + + @OnWorkerEvent('completed') + onCompleted(job: Job) { + if (job.name === 'lifecycle-breadcrumb-test') { + Sentry.addBreadcrumb({ + message: 'leaked-breadcrumb-from-lifecycle-event', + level: 'info', + }); + } + } + + @OnWorkerEvent('active') + onActive(job: Job) { + if (job.name === 'lifecycle-active-breadcrumb-test') { + Sentry.addBreadcrumb({ + message: 'leaked-breadcrumb-from-active-event', + level: 'info', + }); + } + } + + @OnWorkerEvent('failed') + onFailed(job: Job) { + if (job.name === 'lifecycle-failed-breadcrumb-test') { + Sentry.addBreadcrumb({ + message: 'leaked-breadcrumb-from-failed-event', + level: 'info', + }); + } + } + + @OnWorkerEvent('progress') + onProgress(job: Job) { + if (job.name === 'lifecycle-progress-breadcrumb-test') { + Sentry.addBreadcrumb({ + message: 'leaked-breadcrumb-from-progress-event', + level: 'info', + }); + } + } +} diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/main.ts b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/main.ts new file mode 100644 index 000000000000..71ce685f4d61 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/src/main.ts @@ -0,0 +1,15 @@ +// Import this first +import './instrument'; + +// Import other modules +import { NestFactory } from '@nestjs/core'; +import { AppModule } from './app.module'; + +const PORT = 3030; + +async function bootstrap() { + const app = await NestFactory.create(AppModule); + await app.listen(PORT); +} + +bootstrap(); diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/start-event-proxy.mjs b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/start-event-proxy.mjs new file mode 100644 index 000000000000..fe8225afa969 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/start-event-proxy.mjs @@ -0,0 +1,6 @@ +import { startEventProxyServer } from '@sentry-internal/test-utils'; + +startEventProxyServer({ + port: 3031, + proxyServerName: 'nestjs-bullmq', +}); diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tests/bullmq.test.ts b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tests/bullmq.test.ts new file mode 100644 index 000000000000..e49ebd80488c --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tests/bullmq.test.ts @@ -0,0 +1,176 @@ +import { expect, test } from '@playwright/test'; +import { waitForError, waitForTransaction } from '@sentry-internal/test-utils'; + +test('Sends exception to Sentry on error in @Processor process method', async ({ baseURL }) => { + const errorEventPromise = waitForError('nestjs-bullmq', event => { + return ( + !event.type && + event.exception?.values?.[0]?.value === 'Test error from BullMQ processor' && + event.exception?.values?.[0]?.mechanism?.type === 'auto.queue.nestjs.bullmq' + ); + }); + + // Enqueue a job that will fail + await fetch(`${baseURL}/enqueue/fail`); + + const errorEvent = await errorEventPromise; + + expect(errorEvent.exception?.values).toHaveLength(1); + expect(errorEvent.exception?.values?.[0]?.mechanism).toEqual({ + handled: false, + type: 'auto.queue.nestjs.bullmq', + }); +}); + +test('Creates a transaction for successful job processing', async ({ baseURL }) => { + const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.process'; + }); + + // Enqueue a job that will succeed + await fetch(`${baseURL}/enqueue/success`); + + const transaction = await transactionPromise; + + expect(transaction.transaction).toBe('test-queue process'); + expect(transaction.contexts?.trace?.op).toBe('queue.process'); + expect(transaction.contexts?.trace?.origin).toBe('auto.queue.nestjs.bullmq'); +}); + +test('BullMQ processor breadcrumbs do not leak into subsequent HTTP requests', async ({ baseURL }) => { + const processTransactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.process'; + }); + + // Enqueue a job that adds a breadcrumb during processing + await fetch(`${baseURL}/enqueue/breadcrumb-test`); + + await processTransactionPromise; + + const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.transaction === 'GET /check-isolation'; + }); + + await fetch(`${baseURL}/check-isolation`); + + const transaction = await transactionPromise; + + const leakedBreadcrumb = (transaction.breadcrumbs || []).find( + (b: any) => b.message === 'leaked-breadcrumb-from-bullmq-processor', + ); + expect(leakedBreadcrumb).toBeUndefined(); +}); + +// TODO: @OnWorkerEvent('completed') handlers run outside the isolation scope created by process(). +// They are registered via worker.on() (EventEmitter), so breadcrumbs/tags set there +// leak into the default isolation scope and appear on subsequent HTTP requests. +test('BullMQ @OnWorkerEvent completed lifecycle breadcrumbs currently leak into subsequent HTTP requests', async ({ + baseURL, +}) => { + const processTransactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.process'; + }); + + // Enqueue a job (the completed event fires right after the job is processed) + await fetch(`${baseURL}/enqueue/lifecycle-breadcrumb-test`); + + await processTransactionPromise; + + const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.transaction === 'GET /check-isolation'; + }); + + await fetch(`${baseURL}/check-isolation`); + + const transaction = await transactionPromise; + + const leakedBreadcrumb = (transaction.breadcrumbs || []).find( + (b: any) => b.message === 'leaked-breadcrumb-from-lifecycle-event', + ); + // This SHOULD be toBeUndefined() once lifecycle event isolation is implemented. + expect(leakedBreadcrumb).toBeDefined(); +}); + +// TODO: @OnWorkerEvent('active') handlers run outside the isolation scope created by process(). +// Breadcrumbs set there leak into the default isolation scope and appear on subsequent HTTP requests. +test('BullMQ @OnWorkerEvent active lifecycle breadcrumbs currently leak into subsequent HTTP requests', async ({ + baseURL, +}) => { + const processTransactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.process'; + }); + + await fetch(`${baseURL}/enqueue/lifecycle-active-breadcrumb-test`); + + await processTransactionPromise; + + const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.transaction === 'GET /check-isolation'; + }); + + await fetch(`${baseURL}/check-isolation`); + + const transaction = await transactionPromise; + + const leakedBreadcrumb = (transaction.breadcrumbs || []).find( + (b: any) => b.message === 'leaked-breadcrumb-from-active-event', + ); + // This SHOULD be toBeUndefined() once lifecycle event isolation is implemented. + expect(leakedBreadcrumb).toBeDefined(); +}); + +// TODO: @OnWorkerEvent('failed') handlers run outside the isolation scope created by process(). +// Breadcrumbs set there leak into the default isolation scope and appear on subsequent HTTP requests. +test('BullMQ @OnWorkerEvent failed lifecycle breadcrumbs currently leak into subsequent HTTP requests', async ({ + baseURL, +}) => { + const processTransactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.process'; + }); + + await fetch(`${baseURL}/enqueue/lifecycle-failed-breadcrumb-test`); + + await processTransactionPromise; + + const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.transaction === 'GET /check-isolation'; + }); + + await fetch(`${baseURL}/check-isolation`); + + const transaction = await transactionPromise; + + const leakedBreadcrumb = (transaction.breadcrumbs || []).find( + (b: any) => b.message === 'leaked-breadcrumb-from-failed-event', + ); + // This SHOULD be toBeUndefined() once lifecycle event isolation is implemented. + expect(leakedBreadcrumb).toBeDefined(); +}); + +// The 'progress' event does NOT leak breadcrumbs — unlike 'active', 'completed', and 'failed', +// BullMQ emits it inside the process() call (via job.updateProgress()), so it runs within +// the isolation scope already established by the instrumentation. +test('BullMQ @OnWorkerEvent progress lifecycle breadcrumbs do not leak into subsequent HTTP requests', async ({ + baseURL, +}) => { + const processTransactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.process'; + }); + + await fetch(`${baseURL}/enqueue/lifecycle-progress-breadcrumb-test`); + + await processTransactionPromise; + + const transactionPromise = waitForTransaction('nestjs-bullmq', transactionEvent => { + return transactionEvent.transaction === 'GET /check-isolation'; + }); + + await fetch(`${baseURL}/check-isolation`); + + const transaction = await transactionPromise; + + const leakedBreadcrumb = (transaction.breadcrumbs || []).find( + (b: any) => b.message === 'leaked-breadcrumb-from-progress-event', + ); + expect(leakedBreadcrumb).toBeUndefined(); +}); diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tsconfig.build.json b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tsconfig.build.json new file mode 100644 index 000000000000..26c30d4eddf2 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tsconfig.build.json @@ -0,0 +1,4 @@ +{ + "extends": "./tsconfig.json", + "exclude": ["node_modules", "test", "dist"] +} diff --git a/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tsconfig.json b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tsconfig.json new file mode 100644 index 000000000000..cf79f029c781 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/nestjs-bullmq/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "module": "commonjs", + "declaration": true, + "removeComments": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "allowSyntheticDefaultImports": true, + "target": "ES2021", + "sourceMap": true, + "outDir": "./dist", + "baseUrl": "./", + "incremental": true, + "skipLibCheck": true, + "strictNullChecks": false, + "noImplicitAny": false, + "strictBindCallApply": false, + "forceConsistentCasingInFileNames": false, + "noFallthroughCasesInSwitch": false, + "moduleResolution": "Node16" + } +} diff --git a/packages/nestjs/src/integrations/helpers.ts b/packages/nestjs/src/integrations/helpers.ts index beb1ebb669dd..d8b50957f979 100644 --- a/packages/nestjs/src/integrations/helpers.ts +++ b/packages/nestjs/src/integrations/helpers.ts @@ -63,6 +63,26 @@ export function getEventSpanOptions(event: string): { }; } +/** + * Returns span options for nest bullmq process spans. + */ +export function getBullMQProcessSpanOptions(queueName: string): { + name: string; + attributes: Record; + forceTransaction: boolean; +} { + return { + name: `${queueName} process`, + attributes: { + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process', + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.queue.nestjs.bullmq', + 'messaging.system': 'bullmq', + 'messaging.destination.name': queueName, + }, + forceTransaction: true, + }; +} + /** * Adds instrumentation to a js observable and attaches the span to an active parent span. */ diff --git a/packages/nestjs/src/integrations/nest.ts b/packages/nestjs/src/integrations/nest.ts index 7534ba7aef03..330c76f319cb 100644 --- a/packages/nestjs/src/integrations/nest.ts +++ b/packages/nestjs/src/integrations/nest.ts @@ -1,6 +1,7 @@ import { NestInstrumentation as NestInstrumentationCore } from '@opentelemetry/instrumentation-nestjs-core'; import { defineIntegration } from '@sentry/core'; import { generateInstrumentOnce } from '@sentry/node'; +import { SentryNestBullMQInstrumentation } from './sentry-nest-bullmq-instrumentation'; import { SentryNestEventInstrumentation } from './sentry-nest-event-instrumentation'; import { SentryNestInstrumentation } from './sentry-nest-instrumentation'; import { SentryNestScheduleInstrumentation } from './sentry-nest-schedule-instrumentation'; @@ -23,12 +24,17 @@ const instrumentNestSchedule = generateInstrumentOnce(`${INTEGRATION_NAME}.Sched return new SentryNestScheduleInstrumentation(); }); +const instrumentNestBullMQ = generateInstrumentOnce(`${INTEGRATION_NAME}.BullMQ`, () => { + return new SentryNestBullMQInstrumentation(); +}); + export const instrumentNest = Object.assign( (): void => { instrumentNestCore(); instrumentNestCommon(); instrumentNestEvent(); instrumentNestSchedule(); + instrumentNestBullMQ(); }, { id: INTEGRATION_NAME }, ); diff --git a/packages/nestjs/src/integrations/sentry-nest-bullmq-instrumentation.ts b/packages/nestjs/src/integrations/sentry-nest-bullmq-instrumentation.ts new file mode 100644 index 000000000000..b18bab1dc07c --- /dev/null +++ b/packages/nestjs/src/integrations/sentry-nest-bullmq-instrumentation.ts @@ -0,0 +1,115 @@ +import type { InstrumentationConfig } from '@opentelemetry/instrumentation'; +import { + InstrumentationBase, + InstrumentationNodeModuleDefinition, + InstrumentationNodeModuleFile, + isWrapped, +} from '@opentelemetry/instrumentation'; +import { captureException, SDK_VERSION, startSpan, withIsolationScope } from '@sentry/core'; +import { getBullMQProcessSpanOptions } from './helpers'; +import type { ProcessorDecoratorTarget } from './types'; + +const supportedVersions = ['>=10.0.0']; +const COMPONENT = '@nestjs/bullmq'; + +/** + * Custom instrumentation for nestjs bullmq module. + * + * This hooks into the `@Processor` class decorator, which is applied on queue processor classes. + * It wraps the `process` method on the decorated class to fork the isolation scope for each job + * invocation, create a span, and capture errors. + */ +export class SentryNestBullMQInstrumentation extends InstrumentationBase { + public constructor(config: InstrumentationConfig = {}) { + super('sentry-nestjs-bullmq', SDK_VERSION, config); + } + + /** + * Initializes the instrumentation by defining the modules to be patched. + */ + public init(): InstrumentationNodeModuleDefinition { + const moduleDef = new InstrumentationNodeModuleDefinition(COMPONENT, supportedVersions); + + moduleDef.files.push(this._getProcessorFileInstrumentation(supportedVersions)); + return moduleDef; + } + + /** + * Wraps the @Processor decorator. + */ + private _getProcessorFileInstrumentation(versions: string[]): InstrumentationNodeModuleFile { + return new InstrumentationNodeModuleFile( + '@nestjs/bullmq/dist/decorators/processor.decorator.js', + versions, + (moduleExports: { Processor: ProcessorDecoratorTarget }) => { + if (isWrapped(moduleExports.Processor)) { + this._unwrap(moduleExports, 'Processor'); + } + this._wrap(moduleExports, 'Processor', this._createWrapProcessor()); + return moduleExports; + }, + (moduleExports: { Processor: ProcessorDecoratorTarget }) => { + this._unwrap(moduleExports, 'Processor'); + }, + ); + } + + /** + * Creates a wrapper function for the @Processor class decorator. + */ + private _createWrapProcessor() { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return function wrapProcessor(original: any) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return function wrappedProcessor(...decoratorArgs: any[]) { + // Extract queue name from decorator args + // @Processor('queueName') or @Processor({ name: 'queueName' }) + const queueName = + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + typeof decoratorArgs[0] === 'string' ? decoratorArgs[0] : decoratorArgs[0]?.name || 'unknown'; + + // Get the original class decorator + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const classDecorator = original(...decoratorArgs); + + // Return a new class decorator that wraps the process method + return function (target: ProcessorDecoratorTarget) { + const originalProcess = target.prototype.process; + + if ( + originalProcess && + typeof originalProcess === 'function' && + !target.__SENTRY_INTERNAL__ && + !originalProcess.__SENTRY_INSTRUMENTED__ + ) { + target.prototype.process = new Proxy(originalProcess, { + apply: (originalProcessFn, thisArg, args) => { + return withIsolationScope(() => { + return startSpan(getBullMQProcessSpanOptions(queueName), async () => { + try { + return await originalProcessFn.apply(thisArg, args); + } catch (error) { + captureException(error, { + mechanism: { + handled: false, + type: 'auto.queue.nestjs.bullmq', + }, + }); + throw error; + } + }); + }); + }, + }); + + target.prototype.process.__SENTRY_INSTRUMENTED__ = true; + } + + // Apply the original class decorator + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return classDecorator(target); + }; + }; + }; + } +} diff --git a/packages/nestjs/src/integrations/types.ts b/packages/nestjs/src/integrations/types.ts index 88ab09c913e8..6dd00caa8cc1 100644 --- a/packages/nestjs/src/integrations/types.ts +++ b/packages/nestjs/src/integrations/types.ts @@ -91,7 +91,6 @@ export interface CatchTarget { */ export interface OnEventTarget { name: string; - sentryPatched?: boolean; __SENTRY_INTERNAL__?: boolean; } @@ -100,10 +99,20 @@ export interface OnEventTarget { */ export interface ScheduleDecoratorTarget { name: string; - sentryPatched?: boolean; __SENTRY_INTERNAL__?: boolean; } +/** + * Represents a target class in NestJS annotated with @Processor (BullMQ). + */ +export interface ProcessorDecoratorTarget { + name: string; + __SENTRY_INTERNAL__?: boolean; + prototype: { + process?: ((...args: any[]) => Promise) & { __SENTRY_INSTRUMENTED__?: boolean }; + }; +} + /** * Represents an express NextFunction. */ diff --git a/packages/nestjs/test/integrations/bullmq.test.ts b/packages/nestjs/test/integrations/bullmq.test.ts new file mode 100644 index 000000000000..349a0c1b8e43 --- /dev/null +++ b/packages/nestjs/test/integrations/bullmq.test.ts @@ -0,0 +1,155 @@ +import 'reflect-metadata'; +import * as core from '@sentry/core'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { SentryNestBullMQInstrumentation } from '../../src/integrations/sentry-nest-bullmq-instrumentation'; + +describe('BullMQInstrumentation', () => { + let instrumentation: SentryNestBullMQInstrumentation; + + beforeEach(() => { + instrumentation = new SentryNestBullMQInstrumentation(); + vi.spyOn(core, 'captureException'); + vi.spyOn(core, 'withIsolationScope').mockImplementation(callback => { + return (callback as () => unknown)(); + }); + vi.spyOn(core, 'startSpan').mockImplementation((_, callback) => { + return (callback as () => unknown)(); + }); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe('Processor decorator wrapping', () => { + let wrappedDecorator: any; + let mockClassDecorator: vi.Mock; + let mockProcessor: any; + + beforeEach(() => { + mockClassDecorator = vi.fn().mockImplementation(() => { + return (target: any) => target; + }); + + const moduleDef = instrumentation.init(); + const file = moduleDef.files[0]; + const moduleExports = { Processor: mockClassDecorator }; + file?.patch(moduleExports); + wrappedDecorator = moduleExports.Processor; + }); + + it('should call withIsolationScope and startSpan on process execution', async () => { + const originalProcess = vi.fn().mockResolvedValue('result'); + + mockProcessor = class TestProcessor { + process = originalProcess; + }; + mockProcessor.prototype.process = originalProcess; + + const classDecoratorFn = wrappedDecorator('test-queue'); + classDecoratorFn(mockProcessor); + + await mockProcessor.prototype.process(); + + expect(core.withIsolationScope).toHaveBeenCalled(); + expect(core.startSpan).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'test-queue process', + forceTransaction: true, + attributes: expect.objectContaining({ + 'sentry.op': 'queue.process', + 'sentry.origin': 'auto.queue.nestjs.bullmq', + 'messaging.system': 'bullmq', + 'messaging.destination.name': 'test-queue', + }), + }), + expect.any(Function), + ); + expect(originalProcess).toHaveBeenCalled(); + }); + + it('should capture async exceptions and rethrow', async () => { + const error = new Error('Test error'); + const originalProcess = vi.fn().mockRejectedValue(error); + + mockProcessor = class TestProcessor {}; + mockProcessor.prototype.process = originalProcess; + + const classDecoratorFn = wrappedDecorator('test-queue'); + classDecoratorFn(mockProcessor); + + await expect(mockProcessor.prototype.process()).rejects.toThrow(error); + expect(core.captureException).toHaveBeenCalledWith(error, { + mechanism: { + handled: false, + type: 'auto.queue.nestjs.bullmq', + }, + }); + }); + + it('should skip wrapping when __SENTRY_INTERNAL__ is set', async () => { + const originalProcess = vi.fn().mockResolvedValue('result'); + + mockProcessor = class TestProcessor {}; + mockProcessor.prototype.process = originalProcess; + mockProcessor.__SENTRY_INTERNAL__ = true; + + const classDecoratorFn = wrappedDecorator('test-queue'); + classDecoratorFn(mockProcessor); + + // process should not be wrapped + expect(mockProcessor.prototype.process).toBe(originalProcess); + }); + + it('should not double-wrap process method', async () => { + const originalProcess = vi.fn().mockResolvedValue('result'); + + mockProcessor = class TestProcessor {}; + mockProcessor.prototype.process = originalProcess; + + const classDecoratorFn = wrappedDecorator('test-queue'); + classDecoratorFn(mockProcessor); + + const wrappedProcess = mockProcessor.prototype.process; + expect(wrappedProcess).not.toBe(originalProcess); + + // Apply decorator again + const classDecoratorFn2 = wrappedDecorator('test-queue'); + classDecoratorFn2(mockProcessor); + + // Should still be the same wrapped function (not double-wrapped) + expect(mockProcessor.prototype.process).toBe(wrappedProcess); + }); + + it('should extract queue name from ProcessorOptions object', async () => { + const originalProcess = vi.fn().mockResolvedValue('result'); + + mockProcessor = class TestProcessor {}; + mockProcessor.prototype.process = originalProcess; + + const classDecoratorFn = wrappedDecorator({ name: 'my-queue' }); + classDecoratorFn(mockProcessor); + + await mockProcessor.prototype.process(); + + expect(core.startSpan).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'my-queue process', + }), + expect.any(Function), + ); + }); + + it('should apply the original class decorator', () => { + const originalProcess = vi.fn().mockResolvedValue('result'); + + mockProcessor = class TestProcessor {}; + mockProcessor.prototype.process = originalProcess; + + const classDecoratorFn = wrappedDecorator('test-queue'); + classDecoratorFn(mockProcessor); + + expect(mockClassDecorator).toHaveBeenCalledWith('test-queue'); + }); + }); +});