Skip to main content

@globalart/nestjs-kafka

NestJS microservice transport and client for Apache Kafka, built on top of @confluentinc/kafka-javascript (librdkafka).

Prerequisites

This package uses @confluentinc/kafka-javascript, which requires librdkafka to be installed on the system.

Ubuntu / Debian

sudo mkdir -p /etc/apt/keyrings
wget -qO - https://packages.confluent.io/deb/7.8/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null
sudo apt-get update
sudo apt install librdkafka-dev

Installation

npm install @globalart/nestjs-kafka @confluentinc/kafka-javascript

Overview

The package provides two complementary pieces:

PiecePurpose
Transport (KafkaStrategy)Consume messages — wired up via createKafkaMicroservice
Client (KafkaClient)Produce messages and make RPC calls — injected via KafkaClientsModule

Both share the same KafkaOptions configuration object.


Quick start

1. Connect the transport

// main.ts
import { NestFactory } from "@nestjs/core";
import { createKafkaMicroservice } from "@globalart/nestjs-kafka";
import { AppModule } from "./app.module";

async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.enableShutdownHooks();

app.connectMicroservice(
createKafkaMicroservice({
options: {
brokers: ["kafka:9092"],
clientId: "my-service",
groupId: "my-service",
maxRetries: 3,
deadLetterTopic: "my-service.dlq",
shutdownTimeoutMs: 5000,
},
}),
);

await app.startAllMicroservices();
await app.listen(3000);
}

void bootstrap();

2. Register a client

// app.module.ts
import { Module } from "@nestjs/common";
import { KafkaClientsModule } from "@globalart/nestjs-kafka";

export const KAFKA_CLIENT = "KAFKA_CLIENT";

@Module({
imports: [
KafkaClientsModule.register([
{
name: KAFKA_CLIENT,
options: {
brokers: ["kafka:9092"],
clientId: "my-service",
groupId: "my-service",
},
},
]),
],
})
export class AppModule {}

3. Handle events

// events.controller.ts
import { Controller } from "@nestjs/common";
import { EventPattern, Payload } from "@nestjs/microservices";
import { KafkaAck, KafkaKey, KafkaTopic } from "@globalart/nestjs-kafka";

@Controller()
export class EventsController {
@EventPattern("orders.created")
async handleOrder(
@Payload() order: CreateOrderDto,
@KafkaKey() key: string,
@KafkaTopic() topic: string,
@KafkaAck() ack: () => Promise<void>,
) {
await processOrder(order);
await ack();
}
}

4. Emit and RPC

// app.service.ts
import { Injectable, Inject, OnModuleInit } from "@nestjs/common";
import { firstValueFrom } from "rxjs";
import { KafkaClient } from "@globalart/nestjs-kafka";
import { KAFKA_CLIENT } from "./constants";

@Injectable()
export class AppService implements OnModuleInit {
constructor(@Inject(KAFKA_CLIENT) private readonly kafka: KafkaClient) {}

onModuleInit() {
// Must be called before the client connects to subscribe to the reply topic
this.kafka.subscribeToResponseOf("orders.get");
}

async emitCreated(order: unknown) {
await firstValueFrom(this.kafka.emit("orders.created", order));
}

async getOrder(id: string): Promise<Order> {
return firstValueFrom(this.kafka.send("orders.get", { id }));
}
}

Configuration

KafkaOptions

FieldTypeDefaultDescription
brokersstring[]Broker addresses, e.g. ["kafka:9092"]
clientIdstringClient identifier (postfix appended automatically)
groupIdstringConsumer group ID (postfix appended automatically)
postfixIdstring-server / -clientSuffix appended to clientId and groupId
sslbooleanEnable TLS. Auto-detected when rdKafka has ssl* fields
saslKafkaJS.SASLOptionsSASL authentication (plain, scram, oauthbearer, etc.)
consumerPartial<KafkaJS.ConsumerConfig>KafkaJS-level consumer options
consumerRdKafkaKafkaConsumerRdKafkaConfiglibrdkafka consumer options (camelCase)
producerPartial<KafkaJS.ProducerConfig>KafkaJS-level producer options
producerRdKafkaKafkaProducerRdKafkaConfiglibrdkafka producer options (camelCase)
rdKafkaKafkaRdKafkaConfiglibrdkafka global options — SSL certs, socket tuning, etc.
producerOnlyModebooleanfalseSkip consumer creation
maxRetriesnumberInfinityMax handler retries before sending to DLQ
deadLetterTopicstringTopic for failed messages after max retries
shutdownTimeoutMsnumberGraceful shutdown timeout in ms

Async registration

Use registerAsync when options come from a config service.

KafkaClientsModule.registerAsync([
{
name: KAFKA_CLIENT,
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
brokers: config.get<string>("KAFKA_BROKERS").split(","),
clientId: "my-service",
groupId: "my-service",
}),
inject: [ConfigService],
},
])

Reading brokers from environment

export function getKafkaBrokers(env: NodeJS.ProcessEnv): string[] {
const list = env.KAFKA_BROKERS ?? env.KAFKA_BOOTSTRAP_SERVERS;
if (list) return list.split(",").map((s) => s.trim()).filter(Boolean);
return [env.KAFKA_BROKER ?? "localhost:9092"];
}

SSL / TLS

Pass certificate paths or PEM strings via rdKafka. SSL is auto-enabled when any ssl* field is present — no need to set ssl: true explicitly.

rdKafka: {
sslCaLocation: "/etc/certs/ca.crt",
sslCertificateLocation: "/etc/certs/client.crt",
sslKeyLocation: "/etc/certs/client.key",
}

Or as inline PEM strings:

rdKafka: {
sslCaPem: fs.readFileSync("/etc/certs/ca.crt", "utf8"),
sslCertificatePem: fs.readFileSync("/etc/certs/client.crt", "utf8"),
sslKeyPem: fs.readFileSync("/etc/certs/client.key", "utf8"),
}

To disable hostname verification (not recommended for production):

rdKafka: {
sslCaLocation: "/etc/certs/ca.crt",
sslEndpointIdentificationAlgorithm: "none",
}

All SSL fields in rdKafka

Fieldrdkafka property
sslCaLocationssl.ca.location
sslCaPemssl.ca.pem
sslCertificateLocationssl.certificate.location
sslCertificatePemssl.certificate.pem
sslKeyLocationssl.key.location
sslKeyPemssl.key.pem
sslKeyPasswordssl.key.password
sslCipherSuitesssl.cipher.suites
sslCurvesListssl.curves.list
sslCrlLocationssl.crl.location
sslKeystoreLocationssl.keystore.location
sslKeystorePasswordssl.keystore.password
enableSslCertificateVerificationenable.ssl.certificate.verification
sslEndpointIdentificationAlgorithmssl.endpoint.identification.algorithm

SASL

PLAIN / SCRAM

sasl: {
mechanism: "plain", // or "scram-sha-256", "scram-sha-512"
username: "user",
password: "secret",
},
ssl: true,

OAuth 2.0 / OIDC (client credentials)

rdKafka: {
securityProtocol: "sasl_ssl",
saslOauthbearerMethod: "oidc",
saslOauthbearerClientId: "my-client-id",
saslOauthbearerClientSecret: "my-client-secret",
saslOauthbearerTokenEndpointUrl: "https://auth.example.com/token",
saslOauthbearerScope: "kafka",
}

Kerberos (GSSAPI)

sasl: { mechanism: "gssapi" },
rdKafka: {
saslKerberosServiceName: "kafka",
saslKerberosPrincipal: "kafkaclient",
saslKerberosKeytab: "/etc/krb5/client.keytab",
}

Consuming messages

Decorators

DecoratorReturnsDescription
@KafkaKey()string | nullMessage key
@KafkaTopic()stringTopic name
@KafkaPartition()numberPartition number
@KafkaMessageHeaders()Map<string, string>All headers
@KafkaHeader(name)string | undefinedSingle header by name
@KafkaAck()() => Promise<void>Manually commit offset
@KafkaNack()(delayMs?: number) => voidNack with optional retry delay

Auto-commit (simple handlers)

When the handler returns without calling nack(), the offset is committed automatically.

@EventPattern("payments.processed")
async handlePayment(@Payload() payment: PaymentDto) {
await this.paymentsService.record(payment);
// offset committed automatically on return
}

Manual ack/nack

Inject @KafkaAck() and @KafkaNack() to take full control.

@EventPattern("orders.created")
async handleOrder(
@Payload() order: OrderDto,
@KafkaAck() ack: () => Promise<void>,
@KafkaNack() nack: (delayMs?: number) => void,
) {
try {
await this.ordersService.process(order);
await ack();
} catch (err) {
if (isRetryable(err)) {
nack(3000); // retry in 3s
} else {
await ack(); // skip poison message
}
}
}

KafkaContext

Inject the full context via @Ctx() when you need everything at once.

import { Ctx } from "@nestjs/microservices";
import { KafkaContext } from "@globalart/nestjs-kafka";

@EventPattern("events.raw")
async handleRaw(@Payload() data: unknown, @Ctx() ctx: KafkaContext) {
const topic = ctx.getTopic();
const partition = ctx.getPartition();
const msg = ctx.getMessage(); // KafkaJS.KafkaMessage (key already string)
const headers = ctx.getHeaders(); // Map<string, string>
const traceId = headers.get("x-trace-id");

await this.process(data);
await ctx.commit();
}
MethodReturns
getMessage()KafkaJS.KafkaMessage (key normalised to string)
getTopic()string
getPartition()number
getHeaders()Map<string, string>
commit()Promise<void>
nack(delayMs?)void

RPC handlers

@MessagePattern("users.findById")
async findUser(@Payload() { id }: { id: string }): Promise<UserDto> {
return this.usersService.findById(id);
}

The return value is serialised as JSON and sent to the caller's reply topic.


Producing messages

Fire-and-forget

await firstValueFrom(this.kafka.emit("orders.created", order));

Batch emit

await firstValueFrom(
this.kafka.emitBatch("orders.created", {
messages: [order1, order2, order3],
}),
);

RPC (request–reply)

Call subscribeToResponseOf in onModuleInit before the client connects.

onModuleInit() {
this.kafka.subscribeToResponseOf("users.findById");
}

async findUser(id: string): Promise<UserDto> {
return firstValueFrom(this.kafka.send("users.findById", { id }));
}

Access to the underlying producer

const [, producer] = this.kafka.unwrap<[KafkaJS.Consumer, KafkaJS.Producer]>();

await producer.send({
topic: "orders.created",
messages: [{ key: order.id, value: JSON.stringify(order) }],
});

Dead-letter queue (DLQ)

Set maxRetries and deadLetterTopic in KafkaOptions. After maxRetries failed attempts the message is forwarded to the DLQ and the original offset is committed.

createKafkaMicroservice({
options: {
brokers: ["kafka:9092"],
clientId: "my-service",
groupId: "my-service",
maxRetries: 5,
deadLetterTopic: "my-service.dlq",
},
})

The DLQ message carries additional headers:

HeaderValue
x-original-topicSource topic
x-original-partitionSource partition
x-original-offsetSource offset
x-failure-countNumber of attempts
x-errorError message

Advanced rdkafka tuning

All librdkafka config properties are available in camelCase via rdKafka, consumerRdKafka, and producerRdKafka.

rdKafka: {
// Networking
socketTimeoutMs: 30000,
socketKeepaliveEnable: true,
reconnectBackoffMs: 200,
reconnectBackoffMaxMs: 10000,
// Message limits
messageMaxBytes: 10485760, // 10 MB
receiveMessageMaxBytes: 104857600, // 100 MB
// Observability
debug: "broker,topic",
statisticsIntervalMs: 30000,
// Misc
allowAutoCreateTopics: false,
clientRack: "az-eu-west-1a",
},
consumerRdKafka: {
groupInstanceId: "worker-1", // static membership
maxPollIntervalMs: 300000,
sessionTimeoutMs: 30000,
heartbeatIntervalMs: 3000,
fetchMaxBytes: 52428800, // 50 MB
autoOffsetReset: "earliest",
isolationLevel: "read_committed",
},
producerRdKafka: {
enableIdempotence: true,
lingerMs: 5,
compressionCodec: "lz4",
batchSize: 1000000,
messageSendMaxRetries: 5,
}

rdKafka — global options

Fieldrdkafka propertyDescription
socketTimeoutMssocket.timeout.msNetwork request timeout
socketKeepaliveEnablesocket.keepalive.enableTCP keep-alives
reconnectBackoffMsreconnect.backoff.msInitial reconnect backoff
reconnectBackoffMaxMsreconnect.backoff.max.msMax reconnect backoff
messageMaxBytesmessage.max.bytesMax message size
receiveMessageMaxBytesreceive.message.max.bytesMax response size
metadataMaxAgeMsmetadata.max.age.msMetadata cache TTL
topicMetadataRefreshIntervalMstopic.metadata.refresh.interval.msMetadata refresh interval
debugdebugDebug contexts (broker,topic,msg,cgrp,...)
statisticsIntervalMsstatistics.interval.msStats emit interval
enableMetricsPushenable.metrics.pushPush client metrics to broker
clientRackclient.rackRack ID for locality-aware assignment
allowAutoCreateTopicsallow.auto.create.topicsAuto-create topics on subscribe
securityProtocolsecurity.protocolplaintext | ssl | sasl_plaintext | sasl_ssl

consumerRdKafka — consumer options

Fieldrdkafka propertyDescription
groupInstanceIdgroup.instance.idStatic membership (requires broker ≥ 2.3)
sessionTimeoutMssession.timeout.msSession timeout
heartbeatIntervalMsheartbeat.interval.msHeartbeat interval
maxPollIntervalMsmax.poll.interval.msMax time between polls
autoOffsetResetauto.offset.resetearliest | latest | error
fetchMaxBytesfetch.max.bytesMax bytes per fetch
fetchMinBytesfetch.min.bytesMin bytes per fetch
fetchWaitMaxMsfetch.wait.max.msMax fetch wait
fetchMessageMaxBytesfetch.message.max.bytesMax bytes per partition
isolationLevelisolation.levelread_committed | read_uncommitted
partitionAssignmentStrategypartition.assignment.strategyPartition assignor
groupProtocolgroup.protocolclassic | consumer

producerRdKafka — producer options

Fieldrdkafka propertyDescription
enableIdempotenceenable.idempotenceIdempotent producer
transactionalIdtransactional.idEnable transactions
transactionTimeoutMstransaction.timeout.msTransaction timeout
lingerMslinger.msBatch accumulation time
batchSizebatch.sizeMax batch size in bytes
batchNumMessagesbatch.num.messagesMax messages per batch
compressionCodeccompression.codecnone | gzip | snappy | lz4 | zstd
messageSendMaxRetriesmessage.send.max.retriesDelivery retry count
messageTimeoutMsmessage.timeout.msMax time to deliver a message
requestTimeoutMsrequest.timeout.msBroker request timeout
requestRequiredAcksrequest.required.acks0 | 1 | -1 (all)

Health checks

KafkaHealthIndicator integrates with @nestjs/terminus.

import { HealthCheckService, HealthCheck } from "@nestjs/terminus";
import { KafkaHealthIndicator, KafkaStrategy } from "@globalart/nestjs-kafka";

@Controller("health")
export class HealthController {
constructor(
private readonly health: HealthCheckService,
private readonly kafkaHealth: KafkaHealthIndicator,
private readonly strategy: KafkaStrategy, // or inject KafkaClient
) {}

@Get()
@HealthCheck()
check() {
return this.health.check([
() => this.kafkaHealth.isHealthy("kafka", this.strategy),
]);
}
}

Both KafkaStrategy and KafkaClient expose getStatus(): KafkaStatus and satisfy the KafkaHealthCheckable interface.

enum KafkaStatus {
CONNECTED = "connected",
DISCONNECTED = "disconnected",
FAILED = "failed",
}