Skip to main content

Workers Architecture

Background workers for asynchronous processing. For scheduled (cron) jobs running inside the API or BlogAutoGenerationWorker, see Scheduled Jobs.

Worker Types

Bellamy Book uses multiple specialized workers (in Src/backend/):

Kafka-based workers (consume events from Kafka topics):

  • GraphWorker: Social graph processing (Neo4j), friend suggestions, mutual friends
  • ScoringWorker: Content scoring, relevance scores, content rankings; optional content-scores recalculation
  • InteractionWorker: Interaction and engagement processing
  • TrendingWorker: Trending content calculation, trending decay (periodic background job)
  • HashtagWorker: Hashtag extraction and indexing from post/story content
  • ElasticsearchSyncWorker: Syncs posts/blogs to Elasticsearch/OpenSearch for full-text search
  • MediaProcessingWorker: Media processing (images, thumbnails, video transcoding)
  • WebSocketWorker: Real-time notifications (SignalR)

RabbitMQ-based push workers (consume ChatMessage, IncomingCallPushMessage from RabbitMQ; API publishes when a message is sent or a call is started):

  • WebPushNotificationWorker: Web Push (browser) for chat and call notifications; consumes from RabbitMQ; sends push via Web Push API (VAPID).
  • ExpoPushNotificationWorker: Expo Push (mobile) for chat and call notifications; consumes from RabbitMQ; sends push via Expo Push Service.

Other workers:

  • ChatWorker: Hosts SignalR Chat hub at /hubs/chat and ChatOnline at /hubs/chat-online; real-time chat, online users, offline messages. The main API does not host the chat hub — the frontend connects to the ChatWorker URL for chat.
  • WebSocketWorker: Hosts SignalR Notification hub at /hubs/notification and OnlineUser hub at /hubs/onlineusers; in-app notifications and presence. The frontend uses the WebSocketWorker base URL for these hubs.
  • BlogAutoGenerationWorker: AI-powered blog generation (consumes generation requests, publishes blogs)

Worker Architecture

Kafka-Based Workers

Workers consume events from Kafka topics:

Event Published → Kafka Topic → Worker Consumes → Process → Update Database

Worker Structure

public class GraphWorker : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IGraphService _graphService;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("user-events");

while (!stoppingToken.IsCancellationRequested)
{
var result = _consumer.Consume(stoppingToken);
await ProcessEventAsync(result.Message.Value);
}
}

private async Task ProcessEventAsync(string eventData)
{
var @event = JsonSerializer.Deserialize<UserEvent>(eventData);
await _graphService.UpdateGraphAsync(@event);
}
}

GraphWorker

Responsibilities

  • Update social graph
  • Calculate friend suggestions
  • Process friend relationships
  • Update mutual friends

Events Consumed

  • user.registered
  • friend.request.accepted
  • friend.request.declined

ScoringWorker

Responsibilities

  • Score content quality
  • Detect spam
  • Calculate relevance scores
  • Update content rankings

Events Consumed

  • post.created
  • comment.created
  • reaction.added

InteractionWorker

Responsibilities

  • Process interactions
  • Update engagement metrics
  • Calculate user activity scores

Events Consumed

  • post.viewed
  • post.shared
  • comment.created
  • reaction.added

TrendingWorker

Responsibilities

  • Calculate trending posts
  • Update trending topics
  • Track viral content

Processing

Runs periodically (every 5 minutes) to:

  1. Analyze recent engagement
  2. Calculate trending scores
  3. Update trending lists

MediaProcessingWorker

Responsibilities

  • Process uploaded images
  • Generate thumbnails
  • Video transcoding
  • Image optimization

Events Consumed

  • media.uploaded

WebSocketWorker

Responsibilities

  • Send real-time notifications
  • Broadcast updates
  • Manage WebSocket connections

Events Consumed

  • notification.created
  • message.sent
  • post.created (for friends)

HashtagWorker

Responsibilities

  • Extract hashtags from post and story content
  • Normalize and index hashtags
  • Update hashtag–post relationships and post counts
  • Consumes content-created/updated/deleted and user-interaction events

ElasticsearchSyncWorker

Responsibilities

  • Sync post and blog content to Elasticsearch/OpenSearch
  • Enables full-text search (Search API)
  • Consumes content-created/updated/deleted events

BlogAutoGenerationWorker

Responsibilities

  • Consumes AI blog generation requests (from admin or scheduled jobs)
  • Calls AI provider to generate blog content
  • Publishes created blogs; optional SEO and tagging

ChatWorker

Responsibilities

  • Hosts SignalR hub for real-time chat
  • Online user presence; offline message storage and delivery
  • Complements the main API chat endpoints (api/Chat, api/Call, api/StrangerChat)

WebPushNotificationWorker

Responsibilities

  • Consumes ChatMessage and IncomingCallPushMessage from RabbitMQ (same messages the API publishes when a user sends a chat message or starts a call)
  • Looks up the recipient’s web push subscriptions in MongoDB
  • Sends web push to the recipient’s browser via the Web Push API (VAPID)

Events Consumed

  • ChatMessage (from RabbitMQ)
  • IncomingCallPushMessage (from RabbitMQ)

Configuration

Uses the same RabbitMQ and MongoDB as the API; requires WebPush VAPID keys (PublicKey, PrivateKey, Subject) in configuration. See Web Push and Expo Push.

ExpoPushNotificationWorker

Responsibilities

  • Consumes ChatMessage and IncomingCallPushMessage from RabbitMQ
  • Looks up the recipient’s Expo push tokens in MongoDB
  • Sends push to the recipient’s mobile device via Expo Push Service

Events Consumed

  • ChatMessage (from RabbitMQ)
  • IncomingCallPushMessage (from RabbitMQ)

Configuration

Uses the same RabbitMQ and MongoDB as the API. See Web Push and Expo Push.

Deployment

Docker Deployment

services:
graph-worker:
build: ./Workers/GraphWorker
environment:
- KAFKA_BROKER=kafka:9092
- DATABASE_CONNECTION=...

Monitoring

Worker Health

Monitor worker health and processing rates:

_metrics.IncrementCounter("worker.events.processed");
_metrics.RecordGauge("worker.queue.length", queueLength);

Error Handling

Workers implement retry logic and dead letter queues for failed events.

Next Steps