Skip to content

Message Queue Module (RabbitMQ)

Complete guide to deploying and managing RabbitMQ message broker with high availability, clustering, and monitoring capabilities.

The RabbitMQ module provides a robust and scalable message broker that implements the Advanced Message Queuing Protocol (AMQP). It enables reliable message delivery, supports multiple messaging patterns, and provides comprehensive monitoring and management tools.

RabbitMQ Deployment:
├── RabbitMQ Server Pods (Cluster)
├── Persistent Storage
├── Management UI
├── AMQP Protocol Support
├── Health Monitoring
└── Performance Metrics
  • AMQP Protocol: Full support for Advanced Message Queuing Protocol
  • Multiple Exchange Types: Direct, topic, fanout, and headers exchanges
  • Queue Management: Durable, auto-delete, and priority queues
  • Message Routing: Flexible routing with bindings and routing keys
  • Clustering: Multi-node cluster for fault tolerance
  • Mirrored Queues: Queue replication across cluster nodes
  • Automatic Failover: Seamless failover in case of node failure
  • Load Balancing: Distributed message processing
  • Dead Letter Queues: Handle failed message processing
  • Message TTL: Time-to-live for messages
  • Priority Queues: Message prioritization
  • Delayed Messages: Scheduled message delivery
  • Publisher Confirms: Reliable message publishing

The RabbitMQ deployment uses Helmfile for environment management:

iac/modules/rabbitmq/helmfile.yaml
repositories:
- name: bitnami
url: https://charts.bitnami.com/bitnami
releases:
- name: rabbitmq
namespace: queue
createNamespace: true
chart: bitnami/rabbitmq
values:
- values.yaml
iac/modules/rabbitmq/values.yaml
auth:
password: CHANGEME
# Basic configuration
replicaCount: 1
persistence:
enabled: true
size: 8Gi
storageClass: "local-path"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
service:
type: ClusterIP
ingress:
enabled: false
# High-availability configuration
replicaCount: 3
# Clustering configuration
clustering:
enabled: true
rebalance: true
# Performance tuning
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
# Security configuration
auth:
enabled: true
password: "secure-password"
erlangCookie: "SWQOKODSQALRPCLNMEQG"
# Monitoring
metrics:
enabled: true
serviceMonitor:
enabled: true
# Management UI
management:
enabled: true
ingress:
enabled: true
hostname: rabbitmq.theratap.de
tls: true
# Cluster settings
clustering:
enabled: true
rebalance: true
partitionHandlingStrategy: "autoheal"
# Erlang cookie for cluster communication
erlangCookie: "SWQOKODSQALRPCLNMEQG"
# Node naming
nodeName: "rabbit@$(HOSTNAME).$(NAMESPACE).pod.cluster.local"
# Default queue settings
defaultQueueSettings:
durable: true
autoDelete: false
arguments:
x-message-ttl: 86400000 # 24 hours
x-max-length: 10000
x-overflow: drop-head
# Dead letter configuration
deadLetterExchange: "dlx"
deadLetterRoutingKey: "dlq"
# Memory and disk thresholds
vmMemoryHighWatermark:
relative: 0.6 # 60% of available memory
diskFreeLimit:
relative: 2.0 # 2GB free disk space
# Connection limits
maxConnections: 1000
maxChannels: 2000
# Message size limits
maxMessageSize: 134217728 # 128MB
Terminal window
# Check RabbitMQ service status
kubectl get pods -n queue -l app.kubernetes.io/name=rabbitmq
# Check service endpoints
kubectl get endpoints -n queue -l app.kubernetes.io/name=rabbitmq
# Test RabbitMQ connectivity
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl status
Terminal window
# Check resource usage
kubectl top pods -n queue -l app.kubernetes.io/name=rabbitmq
# Monitor queue statistics
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_queues name messages consumers
# Check connection status
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_connections
# Monitor channel usage
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_channels
Terminal window
# Queue depth
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_queues name messages
# Message rates
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_queues name message_stats.publish_details.rate
# Consumer count
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_queues name consumers
# Memory usage
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl status | grep memory
Terminal window
# Port forward management UI
kubectl port-forward -n queue svc/rabbitmq 15672:15672
# Access management UI
open http://localhost:15672
# Default credentials
# Username: admin
# Password: (from auth.password in values.yaml)
Terminal window
# View RabbitMQ logs
kubectl logs -n queue -l app.kubernetes.io/name=rabbitmq
# Follow logs in real-time
kubectl logs -f -n queue deployment/rabbitmq
# Check for errors
kubectl logs -n queue -l app.kubernetes.io/name=rabbitmq | grep ERROR
Terminal window
# Navigate to module directory
cd iac/modules/rabbitmq
# Deploy using Helmfile
helmfile apply
# Verify deployment
kubectl get pods -n queue
kubectl get services -n queue
Terminal window
# Check pod status
kubectl get pods -n queue -l app.kubernetes.io/name=rabbitmq
# Test RabbitMQ connection
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl status
# Check cluster status
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl cluster_status
Terminal window
# Port forward for local access
kubectl port-forward -n queue svc/rabbitmq 5672:5672
# Test with amqp-tools
amqp-declare-queue --url=amqp://admin:password@localhost:5672 -q test-queue
# Publish a test message
amqp-publish --url=amqp://admin:password@localhost:5672 -r test-queue -p "Hello RabbitMQ"
# Consume messages
amqp-consume --url=amqp://admin:password@localhost:5672 -q test-queue
Terminal window
# Create a queue
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl declare_queue name=my-queue durable=true
# List all queues
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_queues
# Purge a queue
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl purge_queue my-queue
# Delete a queue
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl delete_queue my-queue
Terminal window
# Create an exchange
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl declare_exchange name=my-exchange type=direct durable=true
# List all exchanges
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_exchanges
# Delete an exchange
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl delete_exchange my-exchange
Terminal window
# Create a user
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl add_user appuser apppassword
# Set user permissions
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl set_permissions -p / appuser ".*" ".*" ".*"
# List users
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_users
# Delete a user
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl delete_user appuser
Terminal window
# Scale RabbitMQ replicas
kubectl scale statefulset rabbitmq -n queue --replicas=3
# Verify cluster formation
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl cluster_status
# Rebalance cluster
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl rebalance
Terminal window
# Update RabbitMQ version
helmfile apply
# Monitor update progress
kubectl rollout status statefulset/rabbitmq -n queue
# Rollback if needed
kubectl rollout undo statefulset/rabbitmq -n queue
Terminal window
# Check service connectivity
kubectl get services -n queue
# Test network connectivity
kubectl exec -it rabbitmq-0 -n queue -- nc -zv rabbitmq 5672
# Verify DNS resolution
kubectl exec -it rabbitmq-0 -n queue -- nslookup rabbitmq
Terminal window
# Check cluster status
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl cluster_status
# Check node health
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl status
# Verify erlang cookie
kubectl exec -it rabbitmq-0 -n queue -- cat /var/lib/rabbitmq/.erlang.cookie
Terminal window
# Check memory usage
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl status | grep memory
# Check memory alarms
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_alarms
# Monitor disk usage
kubectl exec -it rabbitmq-0 -n queue -- df -h
Terminal window
# Check queue status
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_queues name messages consumers
# Check for unacknowledged messages
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_queues name messages_unacknowledged
# Monitor message rates
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl list_queues name message_stats
Terminal window
# Force delete stuck pods
kubectl delete pod rabbitmq-0 -n queue --grace-period=0 --force
# Reset cluster state
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl stop_app
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl reset
kubectl exec -it rabbitmq-0 -n queue -- rabbitmqctl start_app
# Rejoin cluster
kubectl exec -it rabbitmq-1 -n queue -- rabbitmqctl join_cluster [email protected]
# Enable authentication
auth:
enabled: true
password: "secure-password"
erlangCookie: "SWQOKODSQALRPCLNMEQG"
# Network security
networkPolicy:
enabled: true
allowExternal: false
ingressRules:
primaryAccessOnlyFrom:
enabled: true
namespaceSelector:
matchLabels:
name: production
podSelector:
matchLabels:
app.kubernetes.io/name: backend
# TLS configuration
tls:
enabled: true
secretName: rabbitmq-tls
# Certificate configuration
certificatesSecret: "rabbitmq-certs"
# User permissions
permissions:
- user: appuser
vhost: /
configure: ".*"
write: ".*"
read: ".*"
# High-performance values
resources:
requests:
memory: 2Gi
cpu: 1000m
limits:
memory: 4Gi
cpu: 2000m
# Performance tuning
configuration: |-
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 2.0
max_connections = 1000
max_channels = 2000
max_message_size = 134217728
# High-availability setup
replicaCount: 3
clustering:
enabled: true
rebalance: true
partitionHandlingStrategy: "autoheal"
# Mirrored queues
defaultQueueSettings:
arguments:
x-ha-policy: "all"
# Enhanced monitoring
metrics:
enabled: true
serviceMonitor:
enabled: true
interval: 30s
management:
enabled: true
loadDefinitions: true
definitions:
queues:
- name: "monitoring-queue"
durable: true
arguments:
x-message-ttl: 3600000
  • Monitor queue depths
  • Check connection counts
  • Review error logs
  • Verify cluster health
  • Analyze performance metrics
  • Review queue statistics
  • Update user permissions
  • Check for updates
  • Capacity planning review
  • Performance optimization
  • Security audit
  • Disaster recovery testing
  • Storage classes configured
  • Persistent volumes available
  • Network policies defined
  • Authentication configured
  • Cluster strategy defined
  • RabbitMQ service accessible
  • Management UI functional
  • Cluster formation verified
  • Monitoring configured
  • Application connectivity tested
  • Queue performance optimized
  • Cluster health verified
  • Security updates applied
  • Capacity planning updated
  • Disaster recovery tested

The RabbitMQ message queue module provides reliable message brokering with clustering, monitoring, and comprehensive management capabilities.