Skip to main content

@globalart/platformatic-kafka

A NestJS microservice transport on top of @platformatic/kafka: a server strategy for consuming messages and a ClientProxy for publishing, with exponential-backoff reconnection, per-partition processing queues on the server, and an optional DLQ.

Installation

npm install @globalart/platformatic-kafka @platformatic/kafka

For the health indicator, also install @nestjs/terminus.

Features

  • StrategyPlatformaticKafkaStrategy as a CustomTransportStrategy for connectMicroservice
  • ClientPlatformaticKafkaClient (ClientProxy): emit, emitBatch, request–reply via send when subscribed to replies
  • Reconnection — shared backoff for strategy and client on broker failures
  • Producer-only modeproducerOnlyMode skips creating a consumer on the client
  • Batch publishemitBatch uses a single send round-trip
  • DLQ and retries — strategy-level maxRetries / deadLetterTopic; after handler errors and nack, messages can go to the DLQ when retries are exhausted
  • Manual commit — if consumeOptions sets autocommit: false, call commit() from the context after successful handling
  • Types — options and messages align with @platformatic/kafka types

Quick start: server

main.ts

import { NestFactory } from "@nestjs/core";
import { Transport } from "@nestjs/microservices";
import {
createPlatformaticKafkaMicroservice,
PlatformaticKafkaStrategy,
} from "@globalart/platformatic-kafka";
import { AppModule } from "./app.module";

async function bootstrap() {
const app = await NestFactory.create(AppModule);

app.connectMicroservice(
createPlatformaticKafkaMicroservice({
transport: Transport.KAFKA,
options: {
brokers: ["localhost:9092"],
clientId: "orders-service",
groupId: "orders-consumers",
},
}),
);

await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();

Equivalent without the factory:

app.connectMicroservice({
strategy: new PlatformaticKafkaStrategy({
brokers: ["localhost:9092"],
clientId: "orders-service",
groupId: "orders-consumers",
}),
});

Event handler

The topic matches the @EventPattern string. Payloads are parsed as JSON; invalid JSON on an event topic is skipped with a log warning.

import { Controller } from "@nestjs/common";
import { EventPattern, Payload, Ctx } from "@nestjs/microservices";
import {
PlatformaticKafkaContext,
KafkaTopic,
KafkaPartition,
KafkaAck,
KafkaNack,
} from "@globalart/platformatic-kafka";

@Controller()
export class OrdersController {
@EventPattern("orders.created")
async onOrderCreated(
@Payload() data: unknown,
@Ctx() ctx: PlatformaticKafkaContext,
@KafkaTopic() topic: string,
@KafkaPartition() partition: number,
) {
void topic;
void partition;
}
}

PlatformaticKafkaContext:

  • getMessage() — parsed message (value / key / headers with JSON parsing where applicable)
  • getTopic(), getPartition(), getHeaders()
  • commit() — commit the offset (relevant when consumeOptions.autocommit === false)
  • nack(delayMs?) — defer reprocessing (default delay 5000 ms)

The KafkaKey, KafkaHeader, KafkaHeaders, KafkaAck, and KafkaNack parameter decorators read the same data from the RPC context.

Request–reply (@MessagePattern)

Supported, but in-flight replies can be lost during consumer group rebalances due to @platformatic/kafka limitations. Prefer @EventPattern for event streams.

Client

Module with named clients

import { Module } from "@nestjs/common";
import { PlatformaticKafkaClientsModule } from "@globalart/platformatic-kafka";

@Module({
imports: [
PlatformaticKafkaClientsModule.register([
{
name: "ORDERS_KAFKA",
options: {
brokers: ["localhost:9092"],
clientId: "orders-api",
groupId: "orders-api",
producerOnlyMode: true,
},
},
]),
],
exports: [PlatformaticKafkaClientsModule],
})
export class KafkaModule {}

Async registration: PlatformaticKafkaClientsModule.registerAsync with useFactory and inject.

Using ClientsModule

import { ClientsModule } from "@nestjs/microservices";
import { createPlatformaticKafkaClientBroker } from "@globalart/platformatic-kafka";

ClientsModule.register([
{
name: "KAFKA",
...createPlatformaticKafkaClientBroker({
brokers: ["localhost:9092"],
clientId: "bff",
producerOnlyMode: true,
}),
},
]);

Service

import { Inject, Injectable, OnModuleInit } from "@nestjs/common";
import { ClientProxy } from "@nestjs/microservices";
import { firstValueFrom } from "rxjs";

@Injectable()
export class OrdersApiService implements OnModuleInit {
constructor(@Inject("KAFKA") private readonly kafka: ClientProxy) {}

async onModuleInit() {
await this.kafka.connect();
}

publishCreated(order: unknown) {
this.kafka.emit("orders.created", order);
}

async publishCreatedBatch(orders: unknown[]) {
await firstValueFrom(
this.kafka.emit("orders.created", { messages: orders } as never),
);
}
}

On PlatformaticKafkaClient directly you also have emitBatch(pattern, { messages }), getStatus(), consumer / producer getters after connect(), and unwrap() for low-level access to @platformatic/kafka instances.

For request–reply, call subscribeToResponseOf("topic") on the client before send so the consumer subscribes to ${pattern}.reply.

PlatformaticKafkaOptions

FieldTypeDefaultDescription
brokersstring[] | Broker[]Required. Bootstrap brokers
clientIdstringNest default (KAFKA_DEFAULT_CLIENT)Kafka client id
groupIdstringNest default (KAFKA_DEFAULT_GROUP)Consumer group id
postfixIdstringserver "-server", client "-client"Suffix appended to clientId and resolved groupId
forceClosebooleanfalseForce-close on dispose (mainly for the strategy)
connectionConnectionOptionsTLS, SASL, and other broker connection settings
consumerpartial ConsumerOptionsExcluding clientId, bootstrapBrokers, groupId
producerpartial ProducerOptionsExcluding clientId, bootstrapBrokers, groupId
consumeOptionspartial ConsumeOptionsMerged over internal stream defaults (committed / latest mode, session timeouts, etc.)
produceOptionsProduceOptionsDefault arguments for producer.send
producerOnlyModebooleanfalseProducer only (no consumer on the client)
reconnectReconnectConfigsee belowReconnection backoff
maxRetriesnumberunlimitedRetry cap on handler failure / nack for events
deadLetterTopicstringDLQ topic after maxRetries is exceeded
shutdownTimeoutMsnumberno timeoutWait for partition queues (strategy) or send queue (client) before closing
idempotentProducerbooleanfalseEnables idempotent producer in @platformatic/kafka

ReconnectConfig

FieldDefault
initialDelayMs1000
maxDelayMs30000
multiplier2

Health (Terminus)

import { Controller, Get } from "@nestjs/common";
import { HealthCheck, HealthCheckService, TerminusModule } from "@nestjs/terminus";
import {
PlatformaticKafkaHealthIndicator,
PlatformaticKafkaStrategy,
} from "@globalart/platformatic-kafka";

@Controller("health")
export class HealthController {
constructor(
private health: HealthCheckService,
private kafkaHealth: PlatformaticKafkaHealthIndicator,
private kafka: PlatformaticKafkaStrategy,
) {}

@Get()
@HealthCheck()
check() {
return this.health.check([
() => this.kafkaHealth.isHealthy("kafka", this.kafka),
]);
}
}

The module that provides the indicator must have access to the strategy or any object implementing getStatus(): PlatformaticKafkaStatus.

Other exports

  • brokerHostnameFromBootstrap — host from a broker string or Broker object
  • DEFAULT_POSTFIX_SERVER, DEFAULT_POSTFIX_CLIENT, DEFAULT_KAFKA_METADATA_MAX_AGE_MS
  • Types: PlatformaticKafkaMessage, ParsedKafkaMessage, KafkaSubscribeOptions, KafkaSubscribeMetadata, ConnectionOptions, SASLOptions (re-exported from @platformatic/kafka)
  • PlatformaticKafkaStatusconnected | disconnected | failed
  • @KafkaSubscribe(topic, options?) attaches KafkaSubscribeOptions metadata; wiring handlers to the transport is done with Nest microservices @EventPattern / @MessagePattern

Behavior and limitations

  • Topics may be auto-created on produce (autocreateTopics: true on send calls).
  • Event payloads are serialized as a JSON string in value.
  • The strategy processes messages with per-partition queues; use getQueueMetrics() on PlatformaticKafkaStrategy for queue depth.
  • Client close() waits for the internal SerialQueue to drain; with shutdownTimeoutMs, that wait is bounded.