@globalart/nestjs-typeorm-outbox
NestJS module for implementing the Outbox pattern with TypeORM. Provides reliable message delivery to Kafka through a transactional outbox table.
Installation
npm install @globalart/nestjs-typeorm-outbox @nestjs/typeorm typeorm kafkajs
Overview
The @globalart/nestjs-typeorm-outbox module implements the Transactional Outbox pattern to ensure reliable message delivery to Kafka. Messages are saved to the database within the same transaction as business data, and then asynchronously sent to Kafka via a cron job.
Key Features
- Transactional Guarantee - Messages are saved to the database within the same transaction
- Automatic Processing - Cron job automatically sends messages to Kafka
- Database-Level Locking - Uses PostgreSQL advisory locks to prevent duplicate processing
- Flexible Configuration - Supports async configuration via ConfigService
- Type Safety - Full TypeScript support
Quick Start
Basic Setup
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { TypeormOutboxModule, TypeormOutboxEntity } from "@globalart/nestjs-typeorm-outbox";
@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'postgres',
password: 'postgres',
database: 'postgres',
synchronize: true,
entities: [TypeormOutboxEntity],
}),
TypeormOutboxModule.forRoot({
typeOrmConnectionName: 'default',
}),
TypeormOutboxModule.registerCronAsync({
useFactory: (configService: ConfigService) => ({
kafkaConfig: {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafka-client',
brokers: ['localhost:9092'],
},
},
},
typeOrmConnectionName: 'default',
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}
Using the Service
import { Injectable } from "@nestjs/common";
import {
TypeormOutboxService,
InjectTypeormOutboxService
} from "@globalart/nestjs-typeorm-outbox";
@Injectable()
export class OrderService {
constructor(
@InjectTypeormOutboxService()
private readonly outboxService: TypeormOutboxService
) {}
async createOrder(orderData: OrderData) {
return await this.dataSource.transaction(async (manager) => {
const order = await manager.save(Order, orderData);
await this.outboxService.create({
destinationTopic: 'order.created',
payload: {
orderId: order.id,
userId: order.userId,
total: order.total,
},
headers: {
'event-type': 'order.created',
'version': '1.0',
},
keys: {
orderId: order.id,
},
});
return order;
});
}
}
Configuration
TypeormOutboxModule.forRoot
Registers the base module for working with outbox.
TypeormOutboxModule.forRoot({
typeOrmConnectionName?: string; // TypeORM connection name (default: 'default')
})
TypeormOutboxModule.forRootAsync
Async module registration.
TypeormOutboxModule.forRootAsync({
imports?: Type[];
inject?: InjectionToken[];
useFactory?: (...args: any[]) => Promise<TypeormOutboxModuleOptions> | TypeormOutboxModuleOptions;
})
TypeormOutboxModule.registerCronAsync
Registers a cron job to process outbox messages.
TypeormOutboxModule.registerCronAsync({
imports?: Type[];
inject?: any[];
useFactory?: (...args: any[]) => Promise<TypeormOutboxRegisterCronModuleOptions> | TypeormOutboxRegisterCronModuleOptions;
})
Configuration Parameters:
| Parameter | Type | Description |
|---|---|---|
typeOrmConnectionName | string | TypeORM connection name (default: 'default') |
kafkaConfig | KafkaOptions | Kafka client configuration |
API Reference
TypeormOutboxService
Service for creating outbox messages.
Methods
create(options: CreateOutboxOptions): Promise<void>
Creates a new outbox message.
Parameters:
interface CreateOutboxOptions {
destinationTopic: string; // Kafka topic for message delivery
payload: Record<string, unknown>; // Message payload
headers?: Record<string, string>; // Message headers (optional)
keys?: Record<string, unknown>; // Keys for partitioning (optional)
}
Example:
await outboxService.create({
destinationTopic: 'user.created',
payload: {
userId: '123',
email: 'user@example.com',
name: 'John Doe',
},
headers: {
'event-type': 'user.created',
'version': '1.0',
},
keys: {
userId: '123',
},
});
TypeormOutboxEntity
Entity for storing outbox messages.
@Entity('outbox')
export class TypeormOutboxEntity {
@PrimaryGeneratedColumn("uuid")
id!: string;
@CreateDateColumn({ name: 'created_at' })
createdAt!: Date;
@UpdateDateColumn({ name: 'updated_at' })
updatedAt!: Date;
@Column("character varying", { name: 'destination_topic' })
destinationTopic!: string;
@Column("jsonb", { nullable: true })
headers!: Record<string, string>;
@Column("jsonb", { nullable: true })
keys!: Record<string, unknown>;
@Column("jsonb")
payload!: Record<string, unknown>;
}
Usage Examples
Complete Example with Configuration
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { ConfigModule, ConfigService } from "@nestjs/config";
import {
TypeormOutboxModule,
TypeormOutboxEntity
} from "@globalart/nestjs-typeorm-outbox";
import { KafkaOptions, Transport } from "@nestjs/microservices";
import { registerAs } from "@nestjs/config";
const kafkaConfig = registerAs('kafka', (): KafkaOptions => ({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafka-client',
brokers: ['localhost:9092'],
},
},
}));
@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'postgres',
password: 'postgres',
database: 'postgres',
synchronize: true,
entities: [TypeormOutboxEntity],
}),
ConfigModule.forRoot({
isGlobal: true,
load: [kafkaConfig],
}),
TypeormOutboxModule.forRoot({
typeOrmConnectionName: 'default',
}),
TypeormOutboxModule.registerCronAsync({
useFactory: (configService: ConfigService) => ({
kafkaConfig: configService.get<KafkaOptions>('kafka'),
typeOrmConnectionName: 'default',
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}
Using in a Controller
import { Controller, Post, Body } from "@nestjs/common";
import {
TypeormOutboxService,
InjectTypeormOutboxService
} from "@globalart/nestjs-typeorm-outbox";
@Controller('orders')
export class OrdersController {
constructor(
@InjectTypeormOutboxService()
private readonly outboxService: TypeormOutboxService
) {}
@Post()
async createOrder(@Body() orderData: CreateOrderDto) {
const order = await this.orderService.create(orderData);
await this.outboxService.create({
destinationTopic: 'order.created',
payload: {
orderId: order.id,
userId: order.userId,
total: order.total,
items: order.items,
},
headers: {
'event-type': 'order.created',
'version': '1.0',
},
keys: {
orderId: order.id,
},
});
return order;
}
}
Using in a Transaction
import { Injectable } from "@nestjs/common";
import { DataSource } from "typeorm";
import {
TypeormOutboxService,
InjectTypeormOutboxService
} from "@globalart/nestjs-typeorm-outbox";
@Injectable()
export class PaymentService {
constructor(
private readonly dataSource: DataSource,
@InjectTypeormOutboxService()
private readonly outboxService: TypeormOutboxService
) {}
async processPayment(paymentData: PaymentData) {
return await this.dataSource.transaction(async (manager) => {
const payment = await manager.save(Payment, paymentData);
await this.outboxService.create({
destinationTopic: 'payment.processed',
payload: {
paymentId: payment.id,
amount: payment.amount,
currency: payment.currency,
status: payment.status,
},
headers: {
'event-type': 'payment.processed',
'version': '1.0',
},
keys: {
paymentId: payment.id,
},
});
return payment;
});
}
}
How It Works
Transactional Outbox Pattern
- Message Creation: When executing a business operation, a message is saved to the
outboxtable within the same transaction - Cron Job Processing: A cron job (every 10 seconds) retrieves unprocessed messages from the table
- Locking: PostgreSQL advisory locks are used to prevent parallel processing by the same process
- Kafka Delivery: Messages are sent to Kafka with the specified topic, keys, and headers
- Deletion: After successful delivery, the message is deleted from the table
Message Structure in Kafka
{
key: Record<string, unknown>; // Keys for partitioning
value: Record<string, unknown>; // Message payload
headers: Record<string, string>; // Message headers
}
Cron Job Configuration
By default, the cron job runs every 10 seconds (CronExpression.EVERY_10_SECONDS). This can be changed by overriding the controller:
import { Controller } from "@nestjs/common";
import { Cron, CronExpression } from "@nestjs/schedule";
import { TypeormOutboxController } from "@globalart/nestjs-typeorm-outbox";
@Controller()
export class CustomOutboxController extends TypeormOutboxController {
@Cron(CronExpression.EVERY_5_SECONDS)
async handleOutboxCron() {
await this.executeCronJob();
}
}
Best Practices
- Use Transactions: Always create outbox messages within the same transaction as business data
- Specify Keys: Use
keysto ensure proper partitioning in Kafka - Add Headers: Use
headersfor event metadata (event type, version, etc.) - Monitoring: Monitor the size of the
outboxtable to detect processing issues - Error Handling: Ensure Kafka broker is available, otherwise messages will accumulate in the table
Requirements
- PostgreSQL (for advisory locks)
- Kafka broker
- NestJS application with configured TypeORM
Troubleshooting
Messages Not Being Sent
- Check that Kafka broker is accessible and configuration is correct
- Ensure cron job is registered via
registerCronAsync - Check logs for errors when sending messages
Duplicate Messages
The module uses PostgreSQL advisory locks to prevent parallel processing. If you see duplicates:
- Ensure PostgreSQL is being used
- Check that multiple application instances are not running with the same lock ID
Messages Accumulating in Table
If messages are accumulating in the outbox table:
- Check Kafka broker availability
- Check logs for errors
- Ensure cron job is running (check startup logs)