Skip to main content

@globalart/nestjs-typeorm-outbox

NestJS module for implementing the Outbox pattern with TypeORM. Provides reliable message delivery to Kafka through a transactional outbox table.

Installation

npm install @globalart/nestjs-typeorm-outbox @nestjs/typeorm typeorm kafkajs

Overview

The @globalart/nestjs-typeorm-outbox module implements the Transactional Outbox pattern to ensure reliable message delivery to Kafka. Messages are saved to the database within the same transaction as business data, and then asynchronously sent to Kafka via a cron job.

Key Features

  • Transactional Guarantee - Messages are saved to the database within the same transaction
  • Automatic Processing - Cron job automatically sends messages to Kafka
  • Database-Level Locking - Uses PostgreSQL advisory locks to prevent duplicate processing
  • Flexible Configuration - Supports async configuration via ConfigService
  • Type Safety - Full TypeScript support

Quick Start

Basic Setup

import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { TypeormOutboxModule, TypeormOutboxEntity } from "@globalart/nestjs-typeorm-outbox";

@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'postgres',
password: 'postgres',
database: 'postgres',
synchronize: true,
entities: [TypeormOutboxEntity],
}),
TypeormOutboxModule.forRoot({
typeOrmConnectionName: 'default',
}),
TypeormOutboxModule.registerCronAsync({
useFactory: (configService: ConfigService) => ({
kafkaConfig: {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafka-client',
brokers: ['localhost:9092'],
},
},
},
typeOrmConnectionName: 'default',
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}

Using the Service

import { Injectable } from "@nestjs/common";
import {
TypeormOutboxService,
InjectTypeormOutboxService
} from "@globalart/nestjs-typeorm-outbox";

@Injectable()
export class OrderService {
constructor(
@InjectTypeormOutboxService()
private readonly outboxService: TypeormOutboxService
) {}

async createOrder(orderData: OrderData) {
return await this.dataSource.transaction(async (manager) => {
const order = await manager.save(Order, orderData);

await this.outboxService.create({
destinationTopic: 'order.created',
payload: {
orderId: order.id,
userId: order.userId,
total: order.total,
},
headers: {
'event-type': 'order.created',
'version': '1.0',
},
keys: {
orderId: order.id,
},
});

return order;
});
}
}

Configuration

TypeormOutboxModule.forRoot

Registers the base module for working with outbox.

TypeormOutboxModule.forRoot({
typeOrmConnectionName?: string; // TypeORM connection name (default: 'default')
})

TypeormOutboxModule.forRootAsync

Async module registration.

TypeormOutboxModule.forRootAsync({
imports?: Type[];
inject?: InjectionToken[];
useFactory?: (...args: any[]) => Promise<TypeormOutboxModuleOptions> | TypeormOutboxModuleOptions;
})

TypeormOutboxModule.registerCronAsync

Registers a cron job to process outbox messages.

TypeormOutboxModule.registerCronAsync({
imports?: Type[];
inject?: any[];
useFactory?: (...args: any[]) => Promise<TypeormOutboxRegisterCronModuleOptions> | TypeormOutboxRegisterCronModuleOptions;
})

Configuration Parameters:

ParameterTypeDescription
typeOrmConnectionNamestringTypeORM connection name (default: 'default')
kafkaConfigKafkaOptionsKafka client configuration

API Reference

TypeormOutboxService

Service for creating outbox messages.

Methods

create(options: CreateOutboxOptions): Promise<void>

Creates a new outbox message.

Parameters:

interface CreateOutboxOptions {
destinationTopic: string; // Kafka topic for message delivery
payload: Record<string, unknown>; // Message payload
headers?: Record<string, string>; // Message headers (optional)
keys?: Record<string, unknown>; // Keys for partitioning (optional)
}

Example:

await outboxService.create({
destinationTopic: 'user.created',
payload: {
userId: '123',
email: 'user@example.com',
name: 'John Doe',
},
headers: {
'event-type': 'user.created',
'version': '1.0',
},
keys: {
userId: '123',
},
});

TypeormOutboxEntity

Entity for storing outbox messages.

@Entity('outbox')
export class TypeormOutboxEntity {
@PrimaryGeneratedColumn("uuid")
id!: string;

@CreateDateColumn({ name: 'created_at' })
createdAt!: Date;

@UpdateDateColumn({ name: 'updated_at' })
updatedAt!: Date;

@Column("character varying", { name: 'destination_topic' })
destinationTopic!: string;

@Column("jsonb", { nullable: true })
headers!: Record<string, string>;

@Column("jsonb", { nullable: true })
keys!: Record<string, unknown>;

@Column("jsonb")
payload!: Record<string, unknown>;
}

Usage Examples

Complete Example with Configuration

import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { ConfigModule, ConfigService } from "@nestjs/config";
import {
TypeormOutboxModule,
TypeormOutboxEntity
} from "@globalart/nestjs-typeorm-outbox";
import { KafkaOptions, Transport } from "@nestjs/microservices";
import { registerAs } from "@nestjs/config";

const kafkaConfig = registerAs('kafka', (): KafkaOptions => ({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafka-client',
brokers: ['localhost:9092'],
},
},
}));

@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'postgres',
password: 'postgres',
database: 'postgres',
synchronize: true,
entities: [TypeormOutboxEntity],
}),
ConfigModule.forRoot({
isGlobal: true,
load: [kafkaConfig],
}),
TypeormOutboxModule.forRoot({
typeOrmConnectionName: 'default',
}),
TypeormOutboxModule.registerCronAsync({
useFactory: (configService: ConfigService) => ({
kafkaConfig: configService.get<KafkaOptions>('kafka'),
typeOrmConnectionName: 'default',
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}

Using in a Controller

import { Controller, Post, Body } from "@nestjs/common";
import {
TypeormOutboxService,
InjectTypeormOutboxService
} from "@globalart/nestjs-typeorm-outbox";

@Controller('orders')
export class OrdersController {
constructor(
@InjectTypeormOutboxService()
private readonly outboxService: TypeormOutboxService
) {}

@Post()
async createOrder(@Body() orderData: CreateOrderDto) {
const order = await this.orderService.create(orderData);

await this.outboxService.create({
destinationTopic: 'order.created',
payload: {
orderId: order.id,
userId: order.userId,
total: order.total,
items: order.items,
},
headers: {
'event-type': 'order.created',
'version': '1.0',
},
keys: {
orderId: order.id,
},
});

return order;
}
}

Using in a Transaction

import { Injectable } from "@nestjs/common";
import { DataSource } from "typeorm";
import {
TypeormOutboxService,
InjectTypeormOutboxService
} from "@globalart/nestjs-typeorm-outbox";

@Injectable()
export class PaymentService {
constructor(
private readonly dataSource: DataSource,
@InjectTypeormOutboxService()
private readonly outboxService: TypeormOutboxService
) {}

async processPayment(paymentData: PaymentData) {
return await this.dataSource.transaction(async (manager) => {
const payment = await manager.save(Payment, paymentData);

await this.outboxService.create({
destinationTopic: 'payment.processed',
payload: {
paymentId: payment.id,
amount: payment.amount,
currency: payment.currency,
status: payment.status,
},
headers: {
'event-type': 'payment.processed',
'version': '1.0',
},
keys: {
paymentId: payment.id,
},
});

return payment;
});
}
}

How It Works

Transactional Outbox Pattern

  1. Message Creation: When executing a business operation, a message is saved to the outbox table within the same transaction
  2. Cron Job Processing: A cron job (every 10 seconds) retrieves unprocessed messages from the table
  3. Locking: PostgreSQL advisory locks are used to prevent parallel processing by the same process
  4. Kafka Delivery: Messages are sent to Kafka with the specified topic, keys, and headers
  5. Deletion: After successful delivery, the message is deleted from the table

Message Structure in Kafka

{
key: Record<string, unknown>; // Keys for partitioning
value: Record<string, unknown>; // Message payload
headers: Record<string, string>; // Message headers
}

Cron Job Configuration

By default, the cron job runs every 10 seconds (CronExpression.EVERY_10_SECONDS). This can be changed by overriding the controller:

import { Controller } from "@nestjs/common";
import { Cron, CronExpression } from "@nestjs/schedule";
import { TypeormOutboxController } from "@globalart/nestjs-typeorm-outbox";

@Controller()
export class CustomOutboxController extends TypeormOutboxController {
@Cron(CronExpression.EVERY_5_SECONDS)
async handleOutboxCron() {
await this.executeCronJob();
}
}

Best Practices

  • Use Transactions: Always create outbox messages within the same transaction as business data
  • Specify Keys: Use keys to ensure proper partitioning in Kafka
  • Add Headers: Use headers for event metadata (event type, version, etc.)
  • Monitoring: Monitor the size of the outbox table to detect processing issues
  • Error Handling: Ensure Kafka broker is available, otherwise messages will accumulate in the table

Requirements

  • PostgreSQL (for advisory locks)
  • Kafka broker
  • NestJS application with configured TypeORM

Troubleshooting

Messages Not Being Sent

  1. Check that Kafka broker is accessible and configuration is correct
  2. Ensure cron job is registered via registerCronAsync
  3. Check logs for errors when sending messages

Duplicate Messages

The module uses PostgreSQL advisory locks to prevent parallel processing. If you see duplicates:

  1. Ensure PostgreSQL is being used
  2. Check that multiple application instances are not running with the same lock ID

Messages Accumulating in Table

If messages are accumulating in the outbox table:

  1. Check Kafka broker availability
  2. Check logs for errors
  3. Ensure cron job is running (check startup logs)