๐Ÿ”„ Thmanyah Sync Worker

Background service that synchronizes data between the database and Elasticsearch to maintain search index consistency.

๐ŸŽฏ Purpose

The Sync Worker service is responsible for:

  • Consuming events from the message queue (Redis/BullMQ)
  • Synchronizing program data with Elasticsearch
  • Maintaining search index consistency
  • Handling data transformation and mapping
  • Providing reliable indexing with retry mechanisms

๐Ÿ—๏ธ Architecture

This service is part of the Thmanyah microservices architecture:

Example :
Queue (Redis/BullMQ) โ†’ Sync Worker โ†’ Elasticsearch

The Sync Worker ensures that changes made in the CMS API are reflected in the search index used by the Discovery API.

๐Ÿ› ๏ธ Tech Stack

  • Framework: NestJS + TypeScript
  • Database: PostgreSQL + TypeORM
  • Queue: Redis + BullMQ
  • Search: Elasticsearch
  • Testing: Jest

๐Ÿ“‹ Features

Data Synchronization

  • โœ… Consumes events from Redis queue (queue name is fixed in code)
  • โœ… Synchronizes program data with Elasticsearch
  • โœ… Handles create, update, and delete operations
  • โœ… Implements retry mechanisms for failed operations
  • โœ… Maintains data consistency across services

Index Management

  • โœ… Creates and updates Elasticsearch documents
  • โœ… Handles bulk operations for performance
  • โœ… Manages index mappings and settings
  • โœ… Provides index health monitoring
  • โœ… Supports index reindexing operations

๐Ÿš€ Quick Start

Prerequisites

  • Node.js 18+
  • PostgreSQL database
  • Redis instance
  • Elasticsearch instance
  • Environment variables configured (see below)

Development

Example :
# Install dependencies (from root)
pnpm install

# Start in development mode
pnpm --filter sync-worker dev

# Or from the app directory
cd apps/sync-worker
pnpm dev

Environment Variables

Required environment variables (see root .env.example):

Example :
# Database
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USERNAME=postgres
DATABASE_PASSWORD=password
DATABASE_NAME=thmanyah

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=

# Elasticsearch
ELASTICSEARCH_URL=http://localhost:9200
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=changeme
ELASTICSEARCH_INDEX_NAME=programs

# Node environment
NODE_ENV=development

๐Ÿงช Testing

Example :
# Unit tests
pnpm --filter sync-worker test

# E2E tests
pnpm --filter sync-worker test:e2e

# Test coverage
pnpm --filter sync-worker test:cov

# Watch mode
pnpm --filter sync-worker test:watch

๐Ÿ”ง Development

Available Scripts

Example :
# Development
pnpm dev                    # Start in development mode
pnpm start                  # Start in production mode
pnpm start:dev              # Start with watch mode
pnpm start:debug            # Start in debug mode
pnpm start:prod             # Start in production mode

# Testing
pnpm test                   # Run unit tests
pnpm test:watch             # Run tests in watch mode
pnpm test:cov               # Run tests with coverage
pnpm test:debug             # Run tests in debug mode
pnpm test:e2e               # Run e2e tests

# Building
pnpm build                  # Build the application
pnpm build:watch            # Build in watch mode

# Linting
pnpm lint                   # Run ESLint
pnpm lint:fix               # Fix ESLint issues

Project Structure

Example :
apps/sync-worker/
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ app.module.ts           # Main application module
โ”‚   โ”œโ”€โ”€ main.ts                 # Application entry point
โ”‚   โ”œโ”€โ”€ common/                 # Shared utilities
โ”‚   โ””โ”€โ”€ sync-worker/            # Sync worker logic
โ”‚       โ”œโ”€โ”€ sync-worker.service.ts
โ”‚       โ””โ”€โ”€ sync-worker.service.spec.ts
โ”œโ”€โ”€ test/                       # E2E tests
โ””โ”€โ”€ package.json

๐Ÿ”„ Data Flow

  1. Event Consumption: Sync Worker consumes events from Redis queue (queue name is fixed in code)
  2. Data Retrieval: Fetches program data from database based on event
  3. Data Transformation: Transforms database entities to Elasticsearch documents
  4. Index Update: Updates Elasticsearch index with transformed data
  5. Status Update: Marks job as completed or failed
  6. Retry Logic: Failed jobs are retried with exponential backoff

๐Ÿ“Š Elasticsearch Index Schema

The service manages an Elasticsearch index with the following mapping:

Example :
{
  "mappings": {
    "properties": {
      "id": { "type": "keyword" },
      "title": {
        "type": "text",
        "analyzer": "standard",
        "fields": { "keyword": { "type": "keyword" } }
      },
      "description": { "type": "text", "analyzer": "standard" },
      "type": { "type": "keyword" },
      "language": { "type": "keyword" },
      "tags": { "type": "keyword" },
      "createdAt": { "type": "date" },
      "updatedAt": { "type": "date" }
    }
  },
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  }
}

โš™๏ธ Configuration

Redis Configuration

Example :
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=

Elasticsearch Configuration

Example :
ELASTICSEARCH_URL=http://localhost:9200
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=changeme
ELASTICSEARCH_INDEX_NAME=programs

๐Ÿ”’ Reliability Features

Job Processing

  • Concurrent job processing (concurrency is fixed in code)
  • Job ordering and deduplication
  • Dead letter queue for failed jobs
  • Graceful shutdown handling

Error Handling

  • Exponential backoff for retries
  • Detailed error logging
  • Job failure tracking
  • Recovery mechanisms

Monitoring

  • Job processing metrics (via logs)
  • Queue depth monitoring (via Redis/BullMQ tools)
  • Elasticsearch health checks (via logs)
  • Performance monitoring (via logs)

๐Ÿ“Š Monitoring

  • Health check endpoint: (not implemented, background service)
  • Job processing metrics (via logs)
  • Queue depth monitoring (via Redis/BullMQ tools)
  • Elasticsearch connection status (via logs)
  • Error tracking and reporting (via logs)

๐Ÿš€ Deployment

Docker

Example :
# Build image
docker build --build-arg TARGET_APP=sync-worker -t thmanyah/sync-worker .

# Run container
docker run --env-file .env thmanyah/sync-worker

Production

Example :
# Build for production
pnpm --filter sync-worker build

# Start production server
pnpm --filter sync-worker start:prod

๐Ÿ”ง Troubleshooting

Common Issues

  1. Jobs not being processed

    • Check Redis connectivity
    • Verify queue configuration (see code)
    • Check worker concurrency settings (see code)
    • Review job data format
  2. Elasticsearch errors

    • Check Elasticsearch connectivity
    • Verify index exists and has correct mapping
    • Check authentication credentials
    • Review index settings
  3. Performance issues

    • Monitor Elasticsearch and database performance
    • Review logs for slow operations

Logs

The service provides detailed logging for:

  • Job processing status
  • Elasticsearch operations
  • Error details
  • Performance metrics

๐Ÿค Contributing

  1. Follow the project's coding standards
  2. Add tests for new features
  3. Update configuration documentation
  4. Ensure all tests pass before submitting
  5. Consider data consistency and performance implications

๐Ÿ“„ License

This project is part of the Thmanyah backend system and is licensed under the MIT License.

results matching ""

    No results matching ""