Skip to main content

@globalart/nestjs-etcd

A NestJS module for integrating etcd3 distributed coordination services. This package provides leader election and distributed locking capabilities for NestJS applications.

Installation

npm install @globalart/nestjs-etcd etcd3

Overview

The @globalart/nestjs-etcd package provides a seamless integration with etcd3, enabling distributed coordination features in your NestJS applications. It offers two main features:

  • Leader Election: Automatically elect a leader among multiple application instances
  • Distributed Locking: Acquire and release distributed locks across multiple instances

Key Features

  • Leader Election - Automatic leader selection with failover support
  • Distributed Locking - Reliable distributed locks with TTL support
  • Type-safe - Full TypeScript support with proper type definitions
  • Feature-based - Enable only the features you need
  • Global Module - Available throughout your application

Quick Start

Basic Setup

Import EtcdModule in your root module:

import { Module } from "@nestjs/common";
import { EtcdModule } from "@globalart/nestjs-etcd";

@Module({
imports: [
EtcdModule.forRoot({
features: ["leaderElection", "distributedLock"],
leaderElectionKey: "my-app-leader",
etcdOptions: {
hosts: ["localhost:2379"],
},
}),
],
})
export class AppModule {}

Async Configuration

For dynamic configuration, use forRootAsync:

import { Module } from "@nestjs/common";
import { EtcdModule, EtcdModuleAsyncOptions } from "@globalart/nestjs-etcd";
import { ConfigModule, ConfigService } from "@nestjs/config";

@Module({
imports: [
EtcdModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
features: ["leaderElection", "distributedLock"],
leaderElectionKey: configService.get("ETCD_LEADER_KEY"),
etcdOptions: {
hosts: configService.get("ETCD_HOSTS").split(","),
},
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}

Configuration

EtcdModuleOptions

OptionTypeRequiredDescription
featuresEtcdFeature[]NoArray of features to enable: 'leaderElection', 'distributedLock'
leaderElectionKeystringNoKey used for leader election (default: 'etcd')
etcdOptionsIOptionsYesetcd3 client configuration options

EtcdFeature

  • 'leaderElection': Enables leader election functionality
  • 'distributedLock': Enables distributed locking functionality

etcd3 Options

The etcdOptions parameter accepts all configuration options from the etcd3 library. Common options include:

  • hosts: Array of etcd server addresses (e.g., ['localhost:2379'])
  • auth: Authentication credentials
  • namespace: Key namespace prefix
  • grpcOptions: gRPC client options

Refer to the etcd3 documentation for complete configuration options.

Leader Election

The leader election service allows you to determine if the current instance is the leader among multiple instances.

How It Works

  1. When the module initializes, each instance attempts to become the leader
  2. Only one instance will be elected as the leader
  3. If the leader instance goes down, etcd automatically elects a new leader
  4. The isLeader() method returns true only for the elected leader instance

Basic Usage

import { Controller, Get, Inject } from "@nestjs/common";
import {
InjectEtcdLeaderElectionService,
LeaderElectionService,
} from "@globalart/nestjs-etcd";

@Controller()
export class AppController {
constructor(
@InjectEtcdLeaderElectionService()
private readonly leaderElectionService: LeaderElectionService
) {}

@Get("is-leader")
isLeader() {
return {
isLeader: this.leaderElectionService.isLeader(),
};
}

@Get("leader-only-task")
async leaderOnlyTask() {
if (!this.leaderElectionService.isLeader()) {
return { message: "Not the leader, skipping task" };
}

// Perform leader-only operations
return { message: "Task executed by leader" };
}
}

Use Cases

  • Scheduled Tasks: Only the leader instance runs scheduled cron jobs
  • Resource Management: Leader manages shared resources
  • Health Checks: Leader performs cluster-wide health monitoring
  • Cache Warming: Leader preloads cache data

Leader Election Service API

interface LeaderElectionService {
isLeader(): boolean;
}

Distributed Locking

The distributed lock service allows you to acquire and release locks across multiple instances, ensuring only one instance can execute a critical section at a time. The service works directly with etcd storage, without maintaining local state.

How It Works

  1. When acquire() is called, it creates a lease with TTL and attempts to acquire the lock
  2. If the lock is already held by another instance, EtcdLockFailedError is thrown
  3. The lock is automatically released when the lease expires (based on TTL)
  4. The release() method removes the lock key directly from etcd
  5. The isLocked() method checks the lock status by querying etcd

Basic Usage

import { Controller, Post, Body, Inject } from "@nestjs/common";
import {
InjectEtcdDistributedLockService,
DistributedLockService,
} from "@globalart/nestjs-etcd";

@Controller()
export class AppController {
constructor(
@InjectEtcdDistributedLockService()
private readonly distributedLockService: DistributedLockService
) {}

@Post("process")
async processResource(@Body() data: { resourceId: string }) {
const lockKey = `process:${data.resourceId}`;

try {
await this.distributedLockService.acquire(lockKey, {
ttl: 30, // Lock expires after 30 seconds
});

// Critical section - only one instance can execute this
await this.processResourceData(data.resourceId);

return { message: "Processed successfully", resourceId: data.resourceId };
} catch (error) {
// Handle lock acquisition failure
if (error.message.includes("already locked")) {
return {
message: "Resource is locked, try later",
resourceId: data.resourceId,
};
}
throw error;
} finally {
await this.distributedLockService.release(lockKey);
}
}

@Post("check-lock")
async checkLock(@Body() data: { resourceId: string }) {
const lockKey = `process:${data.resourceId}`;
const isLocked = await this.distributedLockService.isLocked(lockKey);

return {
resourceId: data.resourceId,
isLocked,
};
}

private async processResourceData(resourceId: string) {
// Your business logic here
}
}

Lock Options

OptionTypeDefaultDescription
ttlnumber10Time-to-live in seconds. Lock automatically expires after this time if not released

Lock Methods

acquire(key: string, options?: LockOptions): Promise<Lock>

Acquires a lock for the given key. This is a single attempt - if the lock is already held, an error is thrown immediately.

Parameters:

  • key (string): The lock key identifier
  • options (optional): Lock configuration options

Returns: Promise resolving to a Lock object from etcd3

Throws:

  • EtcdLockFailedError if the lock cannot be acquired (already locked)
  • Error if the distributed lock feature is not enabled
try {
const lock = await distributedLockService.acquire("my-resource", {
ttl: 60, // Lock expires after 60 seconds
});
// Lock acquired successfully
} catch (error) {
// Lock acquisition failed - resource is already locked
console.error("Failed to acquire lock:", error.message);
}

release(key: string): Promise<void>

Releases the lock for the given key by removing it from etcd. Safe to call even if the lock doesn't exist.

Parameters:

  • key (string): The lock key identifier

Returns: Promise that resolves when the lock is released

await distributedLockService.release("my-resource");

isLocked(key: string): Promise<boolean>

Checks if a lock exists for the given key by querying etcd storage.

Parameters:

  • key (string): The lock key identifier

Returns: Promise resolving to true if the lock exists, false otherwise

const locked = await distributedLockService.isLocked("my-resource");
if (locked) {
// Resource is currently locked
console.log("Resource is locked by another instance");
} else {
// Resource is available
console.log("Resource is available");
}

Lock Key Format

Locks are stored in etcd with the prefix acquire/ followed by your key. For example:

  • Input key: process:123
  • Stored in etcd as: acquire/process:123

Use Cases

  • Resource Processing: Ensure only one instance processes a specific resource
  • Database Migrations: Prevent concurrent migration execution
  • Cache Invalidation: Coordinate cache updates across instances
  • Rate Limiting: Implement distributed rate limiting
  • Critical Operations: Protect critical sections that must run exclusively

Best Practices

  • Always use try/finally: Ensure locks are released even if errors occur
  • Set appropriate TTL: Locks should expire to prevent deadlocks if a process crashes
  • Use descriptive keys: Include resource identifiers in lock keys (e.g., process:${resourceId})
  • Handle acquisition failures: Implement proper error handling for lock acquisition failures
  • Check lock status before acquiring: Use isLocked() to check availability before attempting to acquire
  • Release locks promptly: Don't hold locks longer than necessary

API Reference

Decorators

@InjectEtcdLeaderElectionService()

Injects the LeaderElectionService instance.

constructor(
@InjectEtcdLeaderElectionService()
private readonly leaderService: LeaderElectionService
) {}

@InjectEtcdDistributedLockService()

Injects the DistributedLockService instance.

constructor(
@InjectEtcdDistributedLockService()
private readonly lockService: DistributedLockService
) {}

@InjectEtcdClient()

Injects the raw etcd3 client instance for advanced usage.

import { Etcd3 } from 'etcd3';

constructor(
@InjectEtcdClient()
private readonly etcd: Etcd3
) {}

@InjectEtcdId()

Injects the unique instance ID (UUID) generated for this application instance.

constructor(
@InjectEtcdId()
private readonly instanceId: string
) {}

Services

LeaderElectionService

interface LeaderElectionService {
isLeader(): boolean;
}

DistributedLockService

interface DistributedLockService {
acquire(key: string, options?: LockOptions): Promise<Lock>;
release(key: string): Promise<void>;
isLocked(key: string): Promise<boolean>;
}

Note: The service works directly with etcd storage. It does not maintain local state, making it suitable for distributed environments where instances may not share memory.

LockOptions

interface LockOptions {
ttl?: number;
}

Advanced Usage

Custom etcd Operations

If you need direct access to etcd functionality, inject the etcd client:

import { Injectable, Inject } from "@nestjs/common";
import { InjectEtcdClient } from "@globalart/nestjs-etcd";
import { Etcd3 } from "etcd3";

@Injectable()
export class CustomService {
constructor(
@InjectEtcdClient()
private readonly etcd: Etcd3
) {}

async getValue(key: string) {
return await this.etcd.get(key).string();
}

async setValue(key: string, value: string) {
await this.etcd.put(key).value(value);
}

async watchKey(key: string, callback: (value: string) => void) {
const watcher = await this.etcd.watch().key(key).create();
watcher.on("put", (res) => {
callback(res.value.toString());
});
}
}

Instance Identification

Each application instance gets a unique ID that can be used for logging or debugging:

import { Injectable, Inject } from "@nestjs/common";
import { InjectEtcdId } from "@globalart/nestjs-etcd";

@Injectable()
export class MyService {
constructor(
@InjectEtcdId()
private readonly instanceId: string
) {
console.log(`Instance ID: ${instanceId}`);
}
}

Error Handling

Leader Election Errors

Leader election automatically retries on errors. If an error occurs, the service will:

  1. Set isLeader() to false
  2. Log the error
  3. Attempt to rejoin the election

Lock Acquisition Failures

If a lock cannot be acquired (because it's already held by another instance), an EtcdLockFailedError is thrown:

try {
await distributedLockService.acquire("resource", { ttl: 30 });
} catch (error) {
if (error.name === "EtcdLockFailedError") {
// Lock is already held by another instance
console.error("Failed to acquire lock: resource is already locked");
} else {
// Other error occurred
console.error("Unexpected error:", error);
}
}

Lock Release Errors

Lock release errors are logged but don't throw by default. To handle them explicitly:

try {
await distributedLockService.release("resource");
} catch (error) {
// Handle release error
console.error("Failed to release lock:", error);
}

Examples

See the examples directory for complete working examples.

Troubleshooting

Connection Issues

If you're experiencing connection issues:

  1. Verify etcd is running and accessible
  2. Check the hosts configuration in etcdOptions
  3. Ensure network connectivity between your application and etcd cluster
  4. Check etcd logs for errors

Lock Not Releasing

If locks are not releasing properly:

  1. Check if the TTL is too long - locks will auto-expire based on TTL
  2. Verify the lock key is correct (remember the acquire/ prefix is added automatically)
  3. Ensure release() is called in a finally block
  4. Check etcd cluster health and connectivity
  5. Verify the lock key matches between acquire() and release() calls

Leader Not Electing

If leader election is not working:

  1. Verify the leaderElection feature is enabled
  2. Check the leaderElectionKey configuration
  3. Ensure multiple instances are running
  4. Check etcd cluster connectivity

Best Practices

  • Enable only needed features: Only enable features you actually use
  • Use descriptive lock keys: Include resource identifiers in lock keys
  • Set appropriate TTLs: Prevent deadlocks with reasonable TTL values
  • Handle errors gracefully: Implement proper error handling for all operations
  • Monitor etcd cluster: Ensure etcd cluster health for reliable operations
  • Use async configuration: Use forRootAsync for environment-based configuration