@globalart/platformatic-kafka
A NestJS microservice transport on top of @platformatic/kafka: a server strategy for consuming messages and a ClientProxy for publishing, with exponential-backoff reconnection, per-partition processing queues on the server, and an optional DLQ.
Installation
npm install @globalart/platformatic-kafka @platformatic/kafka
For the health indicator, also install @nestjs/terminus.
Features
- Strategy —
PlatformaticKafkaStrategyas aCustomTransportStrategyforconnectMicroservice - Client —
PlatformaticKafkaClient(ClientProxy):emit,emitBatch, request–reply viasendwhen subscribed to replies - Reconnection — shared backoff for strategy and client on broker failures
- Producer-only mode —
producerOnlyModeskips creating a consumer on the client - Batch publish —
emitBatchuses a singlesendround-trip - DLQ and retries — strategy-level
maxRetries/deadLetterTopic; after handler errors andnack, messages can go to the DLQ when retries are exhausted - Manual commit — if
consumeOptionssetsautocommit: false, callcommit()from the context after successful handling - Types — options and messages align with
@platformatic/kafkatypes
Quick start: server
main.ts
import { NestFactory } from "@nestjs/core";
import { Transport } from "@nestjs/microservices";
import {
createPlatformaticKafkaMicroservice,
PlatformaticKafkaStrategy,
} from "@globalart/platformatic-kafka";
import { AppModule } from "./app.module";
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice(
createPlatformaticKafkaMicroservice({
transport: Transport.KAFKA,
options: {
brokers: ["localhost:9092"],
clientId: "orders-service",
groupId: "orders-consumers",
},
}),
);
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
Equivalent without the factory:
app.connectMicroservice({
strategy: new PlatformaticKafkaStrategy({
brokers: ["localhost:9092"],
clientId: "orders-service",
groupId: "orders-consumers",
}),
});
Event handler
The topic matches the @EventPattern string. Payloads are parsed as JSON; invalid JSON on an event topic is skipped with a log warning.
import { Controller } from "@nestjs/common";
import { EventPattern, Payload, Ctx } from "@nestjs/microservices";
import {
PlatformaticKafkaContext,
KafkaTopic,
KafkaPartition,
KafkaAck,
KafkaNack,
} from "@globalart/platformatic-kafka";
@Controller()
export class OrdersController {
@EventPattern("orders.created")
async onOrderCreated(
@Payload() data: unknown,
@Ctx() ctx: PlatformaticKafkaContext,
@KafkaTopic() topic: string,
@KafkaPartition() partition: number,
) {
void topic;
void partition;
}
}
PlatformaticKafkaContext:
getMessage()— parsed message (value/key/headerswith JSON parsing where applicable)getTopic(),getPartition(),getHeaders()commit()— commit the offset (relevant whenconsumeOptions.autocommit === false)nack(delayMs?)— defer reprocessing (default delay 5000 ms)
The KafkaKey, KafkaHeader, KafkaHeaders, KafkaAck, and KafkaNack parameter decorators read the same data from the RPC context.
Request–reply (@MessagePattern)
Supported, but in-flight replies can be lost during consumer group rebalances due to @platformatic/kafka limitations. Prefer @EventPattern for event streams.
Client
Module with named clients
import { Module } from "@nestjs/common";
import { PlatformaticKafkaClientsModule } from "@globalart/platformatic-kafka";
@Module({
imports: [
PlatformaticKafkaClientsModule.register([
{
name: "ORDERS_KAFKA",
options: {
brokers: ["localhost:9092"],
clientId: "orders-api",
groupId: "orders-api",
producerOnlyMode: true,
},
},
]),
],
exports: [PlatformaticKafkaClientsModule],
})
export class KafkaModule {}
Async registration: PlatformaticKafkaClientsModule.registerAsync with useFactory and inject.
Using ClientsModule
import { ClientsModule } from "@nestjs/microservices";
import { createPlatformaticKafkaClientBroker } from "@globalart/platformatic-kafka";
ClientsModule.register([
{
name: "KAFKA",
...createPlatformaticKafkaClientBroker({
brokers: ["localhost:9092"],
clientId: "bff",
producerOnlyMode: true,
}),
},
]);
Service
import { Inject, Injectable, OnModuleInit } from "@nestjs/common";
import { ClientProxy } from "@nestjs/microservices";
import { firstValueFrom } from "rxjs";
@Injectable()
export class OrdersApiService implements OnModuleInit {
constructor(@Inject("KAFKA") private readonly kafka: ClientProxy) {}
async onModuleInit() {
await this.kafka.connect();
}
publishCreated(order: unknown) {
this.kafka.emit("orders.created", order);
}
async publishCreatedBatch(orders: unknown[]) {
await firstValueFrom(
this.kafka.emit("orders.created", { messages: orders } as never),
);
}
}
On PlatformaticKafkaClient directly you also have emitBatch(pattern, { messages }), getStatus(), consumer / producer getters after connect(), and unwrap() for low-level access to @platformatic/kafka instances.
For request–reply, call subscribeToResponseOf("topic") on the client before send so the consumer subscribes to ${pattern}.reply.
PlatformaticKafkaOptions
| Field | Type | Default | Description |
|---|---|---|---|
brokers | string[] | Broker[] | — | Required. Bootstrap brokers |
clientId | string | Nest default (KAFKA_DEFAULT_CLIENT) | Kafka client id |
groupId | string | Nest default (KAFKA_DEFAULT_GROUP) | Consumer group id |
postfixId | string | server "-server", client "-client" | Suffix appended to clientId and resolved groupId |
forceClose | boolean | false | Force-close on dispose (mainly for the strategy) |
connection | ConnectionOptions | — | TLS, SASL, and other broker connection settings |
consumer | partial ConsumerOptions | — | Excluding clientId, bootstrapBrokers, groupId |
producer | partial ProducerOptions | — | Excluding clientId, bootstrapBrokers, groupId |
consumeOptions | partial ConsumeOptions | — | Merged over internal stream defaults (committed / latest mode, session timeouts, etc.) |
produceOptions | ProduceOptions | — | Default arguments for producer.send |
producerOnlyMode | boolean | false | Producer only (no consumer on the client) |
reconnect | ReconnectConfig | see below | Reconnection backoff |
maxRetries | number | unlimited | Retry cap on handler failure / nack for events |
deadLetterTopic | string | — | DLQ topic after maxRetries is exceeded |
shutdownTimeoutMs | number | no timeout | Wait for partition queues (strategy) or send queue (client) before closing |
idempotentProducer | boolean | false | Enables idempotent producer in @platformatic/kafka |
ReconnectConfig
| Field | Default |
|---|---|
initialDelayMs | 1000 |
maxDelayMs | 30000 |
multiplier | 2 |
Health (Terminus)
import { Controller, Get } from "@nestjs/common";
import { HealthCheck, HealthCheckService, TerminusModule } from "@nestjs/terminus";
import {
PlatformaticKafkaHealthIndicator,
PlatformaticKafkaStrategy,
} from "@globalart/platformatic-kafka";
@Controller("health")
export class HealthController {
constructor(
private health: HealthCheckService,
private kafkaHealth: PlatformaticKafkaHealthIndicator,
private kafka: PlatformaticKafkaStrategy,
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([
() => this.kafkaHealth.isHealthy("kafka", this.kafka),
]);
}
}
The module that provides the indicator must have access to the strategy or any object implementing getStatus(): PlatformaticKafkaStatus.
Other exports
brokerHostnameFromBootstrap— host from a broker string orBrokerobjectDEFAULT_POSTFIX_SERVER,DEFAULT_POSTFIX_CLIENT,DEFAULT_KAFKA_METADATA_MAX_AGE_MS- Types:
PlatformaticKafkaMessage,ParsedKafkaMessage,KafkaSubscribeOptions,KafkaSubscribeMetadata,ConnectionOptions,SASLOptions(re-exported from@platformatic/kafka) PlatformaticKafkaStatus—connected|disconnected|failed@KafkaSubscribe(topic, options?)attachesKafkaSubscribeOptionsmetadata; wiring handlers to the transport is done with Nest microservices@EventPattern/@MessagePattern
Behavior and limitations
- Topics may be auto-created on produce (
autocreateTopics: trueonsendcalls). - Event payloads are serialized as a JSON string in
value. - The strategy processes messages with per-partition queues; use
getQueueMetrics()onPlatformaticKafkaStrategyfor queue depth. - Client
close()waits for the internalSerialQueueto drain; withshutdownTimeoutMs, that wait is bounded.