@globalart/nestjs-kafka
NestJS microservice transport and client for Apache Kafka, built on top of @confluentinc/kafka-javascript (librdkafka).
Prerequisites
This package uses @confluentinc/kafka-javascript, which requires librdkafka to be installed on the system.
Ubuntu / Debian
sudo mkdir -p /etc/apt/keyrings
wget -qO - https://packages.confluent.io/deb/7.8/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null
sudo apt-get update
sudo apt install librdkafka-dev
Installation
npm install @globalart/nestjs-kafka @confluentinc/kafka-javascript
Overview
The package provides two complementary pieces:
| Piece | Purpose |
|---|---|
Transport (KafkaStrategy) | Consume messages — wired up via createKafkaMicroservice |
Client (KafkaClient) | Produce messages and make RPC calls — injected via KafkaClientsModule |
Both share the same KafkaOptions configuration object.
Quick start
1. Connect the transport
// main.ts
import { NestFactory } from "@nestjs/core";
import { createKafkaMicroservice } from "@globalart/nestjs-kafka";
import { AppModule } from "./app.module";
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.enableShutdownHooks();
app.connectMicroservice(
createKafkaMicroservice({
options: {
brokers: ["kafka:9092"],
clientId: "my-service",
groupId: "my-service",
maxRetries: 3,
deadLetterTopic: "my-service.dlq",
shutdownTimeoutMs: 5000,
},
}),
);
await app.startAllMicroservices();
await app.listen(3000);
}
void bootstrap();
2. Register a client
// app.module.ts
import { Module } from "@nestjs/common";
import { KafkaClientsModule } from "@globalart/nestjs-kafka";
export const KAFKA_CLIENT = "KAFKA_CLIENT";
@Module({
imports: [
KafkaClientsModule.register([
{
name: KAFKA_CLIENT,
options: {
brokers: ["kafka:9092"],
clientId: "my-service",
groupId: "my-service",
},
},
]),
],
})
export class AppModule {}
3. Handle events
// events.controller.ts
import { Controller } from "@nestjs/common";
import { EventPattern, Payload } from "@nestjs/microservices";
import { KafkaAck, KafkaKey, KafkaTopic } from "@globalart/nestjs-kafka";
@Controller()
export class EventsController {
@EventPattern("orders.created")
async handleOrder(
@Payload() order: CreateOrderDto,
@KafkaKey() key: string,
@KafkaTopic() topic: string,
@KafkaAck() ack: () => Promise<void>,
) {
await processOrder(order);
await ack();
}
}
4. Emit and RPC
// app.service.ts
import { Injectable, Inject, OnModuleInit } from "@nestjs/common";
import { firstValueFrom } from "rxjs";
import { KafkaClient } from "@globalart/nestjs-kafka";
import { KAFKA_CLIENT } from "./constants";
@Injectable()
export class AppService implements OnModuleInit {
constructor(@Inject(KAFKA_CLIENT) private readonly kafka: KafkaClient) {}
onModuleInit() {
// Must be called before the client connects to subscribe to the reply topic
this.kafka.subscribeToResponseOf("orders.get");
}
async emitCreated(order: unknown) {
await firstValueFrom(this.kafka.emit("orders.created", order));
}
async getOrder(id: string): Promise<Order> {
return firstValueFrom(this.kafka.send("orders.get", { id }));
}
}
Configuration
KafkaOptions
| Field | Type | Default | Description |
|---|---|---|---|
brokers | string[] | — | Broker addresses, e.g. ["kafka:9092"] |
clientId | string | — | Client identifier (postfix appended automatically) |
groupId | string | — | Consumer group ID (postfix appended automatically) |
postfixId | string | -server / -client | Suffix appended to clientId and groupId |
ssl | boolean | — | Enable TLS. Auto-detected when rdKafka has ssl* fields |
sasl | KafkaJS.SASLOptions | — | SASL authentication (plain, scram, oauthbearer, etc.) |
consumer | Partial<KafkaJS.ConsumerConfig> | — | KafkaJS-level consumer options |
consumerRdKafka | KafkaConsumerRdKafkaConfig | — | librdkafka consumer options (camelCase) |
producer | Partial<KafkaJS.ProducerConfig> | — | KafkaJS-level producer options |
producerRdKafka | KafkaProducerRdKafkaConfig | — | librdkafka producer options (camelCase) |
rdKafka | KafkaRdKafkaConfig | — | librdkafka global options — SSL certs, socket tuning, etc. |
producerOnlyMode | boolean | false | Skip consumer creation |
maxRetries | number | Infinity | Max handler retries before sending to DLQ |
deadLetterTopic | string | — | Topic for failed messages after max retries |
shutdownTimeoutMs | number | — | Graceful shutdown timeout in ms |
Async registration
Use registerAsync when options come from a config service.
KafkaClientsModule.registerAsync([
{
name: KAFKA_CLIENT,
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
brokers: config.get<string>("KAFKA_BROKERS").split(","),
clientId: "my-service",
groupId: "my-service",
}),
inject: [ConfigService],
},
])
Reading brokers from environment
export function getKafkaBrokers(env: NodeJS.ProcessEnv): string[] {
const list = env.KAFKA_BROKERS ?? env.KAFKA_BOOTSTRAP_SERVERS;
if (list) return list.split(",").map((s) => s.trim()).filter(Boolean);
return [env.KAFKA_BROKER ?? "localhost:9092"];
}
SSL / TLS
Pass certificate paths or PEM strings via rdKafka. SSL is auto-enabled when any ssl* field is present — no need to set ssl: true explicitly.
rdKafka: {
sslCaLocation: "/etc/certs/ca.crt",
sslCertificateLocation: "/etc/certs/client.crt",
sslKeyLocation: "/etc/certs/client.key",
}
Or as inline PEM strings:
rdKafka: {
sslCaPem: fs.readFileSync("/etc/certs/ca.crt", "utf8"),
sslCertificatePem: fs.readFileSync("/etc/certs/client.crt", "utf8"),
sslKeyPem: fs.readFileSync("/etc/certs/client.key", "utf8"),
}
To disable hostname verification (not recommended for production):
rdKafka: {
sslCaLocation: "/etc/certs/ca.crt",
sslEndpointIdentificationAlgorithm: "none",
}
All SSL fields in rdKafka
| Field | rdkafka property |
|---|---|
sslCaLocation | ssl.ca.location |
sslCaPem | ssl.ca.pem |
sslCertificateLocation | ssl.certificate.location |
sslCertificatePem | ssl.certificate.pem |
sslKeyLocation | ssl.key.location |
sslKeyPem | ssl.key.pem |
sslKeyPassword | ssl.key.password |
sslCipherSuites | ssl.cipher.suites |
sslCurvesList | ssl.curves.list |
sslCrlLocation | ssl.crl.location |
sslKeystoreLocation | ssl.keystore.location |
sslKeystorePassword | ssl.keystore.password |
enableSslCertificateVerification | enable.ssl.certificate.verification |
sslEndpointIdentificationAlgorithm | ssl.endpoint.identification.algorithm |
SASL
PLAIN / SCRAM
sasl: {
mechanism: "plain", // or "scram-sha-256", "scram-sha-512"
username: "user",
password: "secret",
},
ssl: true,
OAuth 2.0 / OIDC (client credentials)
rdKafka: {
securityProtocol: "sasl_ssl",
saslOauthbearerMethod: "oidc",
saslOauthbearerClientId: "my-client-id",
saslOauthbearerClientSecret: "my-client-secret",
saslOauthbearerTokenEndpointUrl: "https://auth.example.com/token",
saslOauthbearerScope: "kafka",
}
Kerberos (GSSAPI)
sasl: { mechanism: "gssapi" },
rdKafka: {
saslKerberosServiceName: "kafka",
saslKerberosPrincipal: "kafkaclient",
saslKerberosKeytab: "/etc/krb5/client.keytab",
}
Consuming messages
Decorators
| Decorator | Returns | Description |
|---|---|---|
@KafkaKey() | string | null | Message key |
@KafkaTopic() | string | Topic name |
@KafkaPartition() | number | Partition number |
@KafkaMessageHeaders() | Map<string, string> | All headers |
@KafkaHeader(name) | string | undefined | Single header by name |
@KafkaAck() | () => Promise<void> | Manually commit offset |
@KafkaNack() | (delayMs?: number) => void | Nack with optional retry delay |
Auto-commit (simple handlers)
When the handler returns without calling nack(), the offset is committed automatically.
@EventPattern("payments.processed")
async handlePayment(@Payload() payment: PaymentDto) {
await this.paymentsService.record(payment);
// offset committed automatically on return
}
Manual ack/nack
Inject @KafkaAck() and @KafkaNack() to take full control.
@EventPattern("orders.created")
async handleOrder(
@Payload() order: OrderDto,
@KafkaAck() ack: () => Promise<void>,
@KafkaNack() nack: (delayMs?: number) => void,
) {
try {
await this.ordersService.process(order);
await ack();
} catch (err) {
if (isRetryable(err)) {
nack(3000); // retry in 3s
} else {
await ack(); // skip poison message
}
}
}
KafkaContext
Inject the full context via @Ctx() when you need everything at once.
import { Ctx } from "@nestjs/microservices";
import { KafkaContext } from "@globalart/nestjs-kafka";
@EventPattern("events.raw")
async handleRaw(@Payload() data: unknown, @Ctx() ctx: KafkaContext) {
const topic = ctx.getTopic();
const partition = ctx.getPartition();
const msg = ctx.getMessage(); // KafkaJS.KafkaMessage (key already string)
const headers = ctx.getHeaders(); // Map<string, string>
const traceId = headers.get("x-trace-id");
await this.process(data);
await ctx.commit();
}
| Method | Returns |
|---|---|
getMessage() | KafkaJS.KafkaMessage (key normalised to string) |
getTopic() | string |
getPartition() | number |
getHeaders() | Map<string, string> |
commit() | Promise<void> |
nack(delayMs?) | void |
RPC handlers
@MessagePattern("users.findById")
async findUser(@Payload() { id }: { id: string }): Promise<UserDto> {
return this.usersService.findById(id);
}
The return value is serialised as JSON and sent to the caller's reply topic.
Producing messages
Fire-and-forget
await firstValueFrom(this.kafka.emit("orders.created", order));
Batch emit
await firstValueFrom(
this.kafka.emitBatch("orders.created", {
messages: [order1, order2, order3],
}),
);
RPC (request–reply)
Call subscribeToResponseOf in onModuleInit before the client connects.
onModuleInit() {
this.kafka.subscribeToResponseOf("users.findById");
}
async findUser(id: string): Promise<UserDto> {
return firstValueFrom(this.kafka.send("users.findById", { id }));
}
Access to the underlying producer
const [, producer] = this.kafka.unwrap<[KafkaJS.Consumer, KafkaJS.Producer]>();
await producer.send({
topic: "orders.created",
messages: [{ key: order.id, value: JSON.stringify(order) }],
});
Dead-letter queue (DLQ)
Set maxRetries and deadLetterTopic in KafkaOptions. After maxRetries failed attempts the message is forwarded to the DLQ and the original offset is committed.
createKafkaMicroservice({
options: {
brokers: ["kafka:9092"],
clientId: "my-service",
groupId: "my-service",
maxRetries: 5,
deadLetterTopic: "my-service.dlq",
},
})
The DLQ message carries additional headers:
| Header | Value |
|---|---|
x-original-topic | Source topic |
x-original-partition | Source partition |
x-original-offset | Source offset |
x-failure-count | Number of attempts |
x-error | Error message |
Advanced rdkafka tuning
All librdkafka config properties are available in camelCase via rdKafka, consumerRdKafka, and producerRdKafka.
rdKafka: {
// Networking
socketTimeoutMs: 30000,
socketKeepaliveEnable: true,
reconnectBackoffMs: 200,
reconnectBackoffMaxMs: 10000,
// Message limits
messageMaxBytes: 10485760, // 10 MB
receiveMessageMaxBytes: 104857600, // 100 MB
// Observability
debug: "broker,topic",
statisticsIntervalMs: 30000,
// Misc
allowAutoCreateTopics: false,
clientRack: "az-eu-west-1a",
},
consumerRdKafka: {
groupInstanceId: "worker-1", // static membership
maxPollIntervalMs: 300000,
sessionTimeoutMs: 30000,
heartbeatIntervalMs: 3000,
fetchMaxBytes: 52428800, // 50 MB
autoOffsetReset: "earliest",
isolationLevel: "read_committed",
},
producerRdKafka: {
enableIdempotence: true,
lingerMs: 5,
compressionCodec: "lz4",
batchSize: 1000000,
messageSendMaxRetries: 5,
}
rdKafka — global options
| Field | rdkafka property | Description |
|---|---|---|
socketTimeoutMs | socket.timeout.ms | Network request timeout |
socketKeepaliveEnable | socket.keepalive.enable | TCP keep-alives |
reconnectBackoffMs | reconnect.backoff.ms | Initial reconnect backoff |
reconnectBackoffMaxMs | reconnect.backoff.max.ms | Max reconnect backoff |
messageMaxBytes | message.max.bytes | Max message size |
receiveMessageMaxBytes | receive.message.max.bytes | Max response size |
metadataMaxAgeMs | metadata.max.age.ms | Metadata cache TTL |
topicMetadataRefreshIntervalMs | topic.metadata.refresh.interval.ms | Metadata refresh interval |
debug | debug | Debug contexts (broker,topic,msg,cgrp,...) |
statisticsIntervalMs | statistics.interval.ms | Stats emit interval |
enableMetricsPush | enable.metrics.push | Push client metrics to broker |
clientRack | client.rack | Rack ID for locality-aware assignment |
allowAutoCreateTopics | allow.auto.create.topics | Auto-create topics on subscribe |
securityProtocol | security.protocol | plaintext | ssl | sasl_plaintext | sasl_ssl |
consumerRdKafka — consumer options
| Field | rdkafka property | Description |
|---|---|---|
groupInstanceId | group.instance.id | Static membership (requires broker ≥ 2.3) |
sessionTimeoutMs | session.timeout.ms | Session timeout |
heartbeatIntervalMs | heartbeat.interval.ms | Heartbeat interval |
maxPollIntervalMs | max.poll.interval.ms | Max time between polls |
autoOffsetReset | auto.offset.reset | earliest | latest | error |
fetchMaxBytes | fetch.max.bytes | Max bytes per fetch |
fetchMinBytes | fetch.min.bytes | Min bytes per fetch |
fetchWaitMaxMs | fetch.wait.max.ms | Max fetch wait |
fetchMessageMaxBytes | fetch.message.max.bytes | Max bytes per partition |
isolationLevel | isolation.level | read_committed | read_uncommitted |
partitionAssignmentStrategy | partition.assignment.strategy | Partition assignor |
groupProtocol | group.protocol | classic | consumer |
producerRdKafka — producer options
| Field | rdkafka property | Description |
|---|---|---|
enableIdempotence | enable.idempotence | Idempotent producer |
transactionalId | transactional.id | Enable transactions |
transactionTimeoutMs | transaction.timeout.ms | Transaction timeout |
lingerMs | linger.ms | Batch accumulation time |
batchSize | batch.size | Max batch size in bytes |
batchNumMessages | batch.num.messages | Max messages per batch |
compressionCodec | compression.codec | none | gzip | snappy | lz4 | zstd |
messageSendMaxRetries | message.send.max.retries | Delivery retry count |
messageTimeoutMs | message.timeout.ms | Max time to deliver a message |
requestTimeoutMs | request.timeout.ms | Broker request timeout |
requestRequiredAcks | request.required.acks | 0 | 1 | -1 (all) |
Health checks
KafkaHealthIndicator integrates with @nestjs/terminus.
import { HealthCheckService, HealthCheck } from "@nestjs/terminus";
import { KafkaHealthIndicator, KafkaStrategy } from "@globalart/nestjs-kafka";
@Controller("health")
export class HealthController {
constructor(
private readonly health: HealthCheckService,
private readonly kafkaHealth: KafkaHealthIndicator,
private readonly strategy: KafkaStrategy, // or inject KafkaClient
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([
() => this.kafkaHealth.isHealthy("kafka", this.strategy),
]);
}
}
Both KafkaStrategy and KafkaClient expose getStatus(): KafkaStatus and satisfy the KafkaHealthCheckable interface.
enum KafkaStatus {
CONNECTED = "connected",
DISCONNECTED = "disconnected",
FAILED = "failed",
}