Redis as a Central Broker for a Linear Data Pipeline: A Pragmatic Alternative to Kubernetes

by Sylvain Artois on Sep 7, 2025

Weasels at Play, 1911 - Franz Marc - www.nga.gov
Weasels at Play, 1911 - Franz Marc - www.nga.gov

The Architecture Dilemma

When building a first POC or MVP, the architectural consensus today is clear: start with a monolith. A microservices architecture often relies on Kubernetes, which is undeniably a nightmare for a solo developer.

But between a monolith and a Kubernetes infrastructure, variations are possible.

For my data/ML/NLP project, which relies on a linear pipeline of successive processing stages, I chose to architect the system using Redis as a central broker to coordinate different services: data extraction, SetFit classification, BERTopic modeling, trends building, re-classification, semantic search with Qdrant, and final Astro build.

Why Not a Monolith? Why Not Kubernetes?

I didn’t really consider the monolith option. I don’t see how I could combine a container that clusters NLP data, another that makes API calls to Mistral, and yet another that builds payloads for Qdrant searches in a single monolith.

I also didn’t consider Kubernetes. I’ve heard too many clients at my previous company complain about their infrastructure to impose that complexity on myself at this stage of the project.

I chose Redis because I know it well and it’s a proven technology.

Project Structure

Here’s how the AFK project is organized:

afk/
├── broker/                  # Infrastructure and orchestration
│   ├── migrations/          # Database migrations
│   ├── backups/             # Backup storage
│   └── logs/                # Service logs
├── data-acquisition/        # Data ingestion services
│   └── headlines/           # RSS/XML feed processing
├── gen-output/              # Processing and output services
│   ├── front/            # Astro web application
│   ├── api/                 # FastAPI backend
│   ├── semantic-matcher/    # Semantic search service
│   ├── others ML services ...
├── ml/                      # Machine learning components
│   ├── dataset-generator/  # Training data preparation
│   ├── fine-tuning/        # Model fine-tuning
│   ├── label-studio/       # Data annotation
│   └── setfit-classification/
└── shared/                  # Shared libraries and models
    └── models/             # Pydantic models

The Central Infrastructure

All containers are defined via Docker Compose, which I’m comfortable with and which greatly simplifies volume and network management. I have a Docker stack I call central-infrastructure, containing a PostgreSQL instance, Qdrant, Redis, debugging tools (Portainer, pgAdmin), and an Ofelia scheduler.

networks:
  central-net:
    name: central-net
  shared-net:
    name: shared-net
    external: true

services:
  redis:
    #...
    networks:
      - central-net
      - shared-net

  postgres:
    #...
    environment:
      POSTGRES_DB: afk

  qdrant:
    image: qdrant/qdrant
    #...

  scheduler:
    image: mcuadros/ofelia:latest
    container_name: central-scheduler
    volumes:
      - ./scheduler-config.ini:/etc/ofelia/config.ini

  # ...

The external network allows my entire local network to receive events, including a Jetson AGX Orin running Ollama.

Event-Driven Architecture with Redis

In this stack, I have a python-env container that runs various scripts, including a listener that connects to all channels:

def run(self):
    """Main loop for listening to all channels via pattern subscription."""
    self.logger.info("Starting Redis Flow Listener")
    self.logger.info(f"Monitoring pattern: {self.pattern}")

    # Subscribe to all channels using pattern
    self.pubsub = self.redis_client.pubsub()
    self.pubsub.psubscribe('*')

    # Main listening loop
    while self.running:
        try:
            message = self.pubsub.get_message(timeout=1.0)

            if message and message['type'] == 'pmessage':
                pattern = message['pattern']
                channel = message['channel']

                try:
                    # Parse JSON message
                    data = json.loads(message['data'])
                    self.logger.info(f"[{channel}] Received message: {json.dumps(data, indent=2)}")
                    self.flow_logger.log_event(channel, data, pattern)

This logs every shared events, and ease debugging a lot.

Pipeline Scheduling

The scheduler mounts the Ofelia config as a volume. The configuration is straightforward and triggers the pipeline:

[job-exec "trigger-xml-headlines-extractor-week"]
schedule = 0 0 7,10,13,16,19,22 * * 1,2,3,4,5,6
container = central-redis
command = redis-cli PUBLISH ofelia "{\"event\": \"trigger-xml-headlines-extractor\"}"

Shared Code and Multi-Channel Listeners

I have a shared folder that allows code sharing between services. Currently, I mount the volume directly in compose.yml, but if the project succeeds, I can easily create a private pip package (see this guide) to increase isolation.

I have a BaseRedisListenerMultiChannel class that allows a given service to listen to multiple channels (useful for my SetFit classifier that runs twice in the pipeline):

class BaseRedisListenerMultiChannel(ABC):
    """Base class for Redis event listeners that monitor multiple channels."""

    def __init__(self, service_name: str, channels: List[str],
                 processor_script: str, config: Optional[Dict[str, Any]] = None):
        self.service_name = service_name
        self.processor_script = processor_script

        # Apply environment prefix if set (for dev/staging environments)
        env_prefix = os.getenv("ENV", "")
        if env_prefix:
            self.channels = [f"{env_prefix}-{channel}" for channel in channels]
        else:
            self.channels = channels

Each service is typically launched by compose.yml with:

command: python3 redis_listener.py

Message Normalization

Early on, I felt the need to normalize channel names and events. As documented in my README:

The system follows an event-driven pipeline with normalized Redis messaging:

  • Channel: Name of the emitting microservice
  • Message: JSON with an event property describing the completed task

Request-Response Pattern with Correlation IDs

When you need ping-pong communication between services — for example, when a user launches a search from the API that must wait for a response from my semantic-matcher encoding the query to Qdrant — I use correlation IDs:

# Generate correlation ID
correlation_id = str(uuid.uuid4())

# Create response channel with correlation ID
response_channel = f"{ENV_PREFIX}-splade_search:response:{correlation_id}" if ENV_PREFIX else f"splade_search:response:{correlation_id}"

# Subscribe to response channel BEFORE sending request
pubsub = r.pubsub()
pubsub.subscribe(response_channel)

# Send request with correlation ID
request = {
    "correlation_id": correlation_id,
    "query": query,
    "limit": limit,
    # ... other parameters
}
r.publish(REQUEST_CHANNEL, json.dumps(request))

The correlation ID creates a channel that’s listened to only by this specific response.

Future Evolution: Configuration-Driven Pipeline

Currently, my pipeline is hard-coded — each instance manually defines what it listens to and publishes, which isn’t ideal (it’s already changed twice).

Working with Claude Opus, we’ve imagined an additional abstraction layer where the pipeline is defined via a configuration file:

{
  "pipeline": {
    "name": "AFK Data Processing Pipeline",
    "version": "1.0.0",
    "description": "Event-driven pipeline configuration for AFK news processing system"
  },
  "services": {
    "ofelia-scheduler": {
      "name": "Ofelia Scheduler",
      "description": "Cron-based task scheduler",
      "publishes": [
        {
          "channel": "ofelia",
          "events": ["trigger-xml-headlines-extractor"]
        }
      ]
    },
    "xml-headlines-extractor": {
      "name": "XML Headlines Extractor",
      "description": "Parses RSS/XML feeds and extracts headlines",
      "subscribes": [
        {
          "channel": "ofelia",
          "events": ["trigger-xml-headlines-extractor"]
        }
      ],
      "publishes": [
        {
          "channel": "xml-headlines-extractor",
          "events": ["xml_processing_complete"]
        }
      ],
      "required_fields": ["batch_id", "status", "timestamp"]
    }
    // ... other services
  }
}

Key Benefits of This Architecture

1. GPU Allocation Simplicity

The linear pipeline means I don’t have to worry about GPU allocation — services process sequentially, avoiding resource conflicts.

2. Easy Maintenance Without Complex Monitoring

This system is easy to maintain without extensive monitoring infrastructure (often difficult to implement at the start of a project). Event logs, Portainer, and container logs are sufficient for troubleshooting.

3. Technology Flexibility

I chose to implement my internal API with FastAPI, primarily to share Pydantic models throughout my codebase, but I could just as easily have implemented it with NestJS, which I’m more comfortable with.

4. True Service Isolation

I don’t see how I could have, with a monolith, run my Astro build with its Node.js servers island while simultaneously building my BERTopic clusters or my SetFit fine-tuning pipeline.

5. No Kubernetes Complexity

I’m very happy not working with Kubernetes, and shared databases considerably simplify microservice design.

Conclusion

This architecture represents a middle ground between monolithic applications and full Kubernetes deployments. By using Redis as a central message broker with Docker Compose orchestration, I’ve achieved:

  • Service isolation without Kubernetes complexity
  • Event-driven communication with clear message contracts
  • Easy debugging through centralized logging and event monitoring
  • Flexible scaling where individual services can be optimized independently
  • Rapid development with familiar tools and minimal infrastructure overhead

For solo developers or small teams working on data-intensive applications, this approach offers the benefits of microservices without the operational burden of enterprise-grade orchestration platforms. Sometimes, the best architecture is the one that lets you focus on building your product rather than managing infrastructure.

The key insight? Not every distributed system needs Kubernetes. Sometimes, a well-designed event bus with Redis and Docker Compose is exactly what you need to build a robust, maintainable data pipeline.

Share on LinkedIn


Leave a Comment