Workers Architecture
Background workers for asynchronous processing. For scheduled (cron) jobs running inside the API or BlogAutoGenerationWorker, see Scheduled Jobs.
Worker Types
Bellamy Book uses multiple specialized workers (in Src/backend/):
Kafka-based workers (consume events from Kafka topics):
- GraphWorker: Social graph processing (Neo4j), friend suggestions, mutual friends
- ScoringWorker: Content scoring, relevance scores, content rankings; optional content-scores recalculation
- InteractionWorker: Interaction and engagement processing
- TrendingWorker: Trending content calculation, trending decay (periodic background job)
- HashtagWorker: Hashtag extraction and indexing from post/story content
- ElasticsearchSyncWorker: Syncs posts/blogs to Elasticsearch/OpenSearch for full-text search
- MediaProcessingWorker: Media processing (images, thumbnails, video transcoding)
- WebSocketWorker: Real-time notifications (SignalR)
RabbitMQ-based push workers (consume ChatMessage, IncomingCallPushMessage from RabbitMQ; API publishes when a message is sent or a call is started):
- WebPushNotificationWorker: Web Push (browser) for chat and call notifications; consumes from RabbitMQ; sends push via Web Push API (VAPID).
- ExpoPushNotificationWorker: Expo Push (mobile) for chat and call notifications; consumes from RabbitMQ; sends push via Expo Push Service.
Other workers:
- ChatWorker: Hosts SignalR Chat hub at
/hubs/chatand ChatOnline at/hubs/chat-online; real-time chat, online users, offline messages. The main API does not host the chat hub — the frontend connects to the ChatWorker URL for chat. - WebSocketWorker: Hosts SignalR Notification hub at
/hubs/notificationand OnlineUser hub at/hubs/onlineusers; in-app notifications and presence. The frontend uses the WebSocketWorker base URL for these hubs. - BlogAutoGenerationWorker: AI-powered blog generation (consumes generation requests, publishes blogs)
Worker Architecture
Kafka-Based Workers
Workers consume events from Kafka topics:
Event Published → Kafka Topic → Worker Consumes → Process → Update Database
Worker Structure
public class GraphWorker : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IGraphService _graphService;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("user-events");
while (!stoppingToken.IsCancellationRequested)
{
var result = _consumer.Consume(stoppingToken);
await ProcessEventAsync(result.Message.Value);
}
}
private async Task ProcessEventAsync(string eventData)
{
var @event = JsonSerializer.Deserialize<UserEvent>(eventData);
await _graphService.UpdateGraphAsync(@event);
}
}
GraphWorker
Responsibilities
- Update social graph
- Calculate friend suggestions
- Process friend relationships
- Update mutual friends
Events Consumed
user.registeredfriend.request.acceptedfriend.request.declined
ScoringWorker
Responsibilities
- Score content quality
- Detect spam
- Calculate relevance scores
- Update content rankings
Events Consumed
post.createdcomment.createdreaction.added
InteractionWorker
Responsibilities
- Process interactions
- Update engagement metrics
- Calculate user activity scores
Events Consumed
post.viewedpost.sharedcomment.createdreaction.added
TrendingWorker
Responsibilities
- Calculate trending posts
- Update trending topics
- Track viral content
Processing
Runs periodically (every 5 minutes) to:
- Analyze recent engagement
- Calculate trending scores
- Update trending lists
MediaProcessingWorker
Responsibilities
- Process uploaded images
- Generate thumbnails
- Video transcoding
- Image optimization
Events Consumed
media.uploaded
WebSocketWorker
Responsibilities
- Send real-time notifications
- Broadcast updates
- Manage WebSocket connections
Events Consumed
notification.createdmessage.sentpost.created(for friends)
HashtagWorker
Responsibilities
- Extract hashtags from post and story content
- Normalize and index hashtags
- Update hashtag–post relationships and post counts
- Consumes content-created/updated/deleted and user-interaction events
ElasticsearchSyncWorker
Responsibilities
- Sync post and blog content to Elasticsearch/OpenSearch
- Enables full-text search (Search API)
- Consumes content-created/updated/deleted events
BlogAutoGenerationWorker
Responsibilities
- Consumes AI blog generation requests (from admin or scheduled jobs)
- Calls AI provider to generate blog content
- Publishes created blogs; optional SEO and tagging
ChatWorker
Responsibilities
- Hosts SignalR hub for real-time chat
- Online user presence; offline message storage and delivery
- Complements the main API chat endpoints (
api/Chat,api/Call,api/StrangerChat)
WebPushNotificationWorker
Responsibilities
- Consumes ChatMessage and IncomingCallPushMessage from RabbitMQ (same messages the API publishes when a user sends a chat message or starts a call)
- Looks up the recipient’s web push subscriptions in MongoDB
- Sends web push to the recipient’s browser via the Web Push API (VAPID)
Events Consumed
ChatMessage(from RabbitMQ)IncomingCallPushMessage(from RabbitMQ)
Configuration
Uses the same RabbitMQ and MongoDB as the API; requires WebPush VAPID keys (PublicKey, PrivateKey, Subject) in configuration. See Web Push and Expo Push.
ExpoPushNotificationWorker
Responsibilities
- Consumes ChatMessage and IncomingCallPushMessage from RabbitMQ
- Looks up the recipient’s Expo push tokens in MongoDB
- Sends push to the recipient’s mobile device via Expo Push Service
Events Consumed
ChatMessage(from RabbitMQ)IncomingCallPushMessage(from RabbitMQ)
Configuration
Uses the same RabbitMQ and MongoDB as the API. See Web Push and Expo Push.
Deployment
Docker Deployment
services:
graph-worker:
build: ./Workers/GraphWorker
environment:
- KAFKA_BROKER=kafka:9092
- DATABASE_CONNECTION=...
Monitoring
Worker Health
Monitor worker health and processing rates:
_metrics.IncrementCounter("worker.events.processed");
_metrics.RecordGauge("worker.queue.length", queueLength);
Error Handling
Workers implement retry logic and dead letter queues for failed events.