Pigeon

A robust Ruby gem that implements the outbox pattern for Kafka message publishing, ensuring reliable message delivery with enterprise features like encryption, tracing, monitoring, and framework integrations.

Features

  • Outbox Pattern: Reliable message delivery with database-backed persistence
  • Encryption: AES-256-GCM encryption for sensitive payloads
  • Tracing: OpenTelemetry integration for distributed tracing
  • Monitoring: Comprehensive metrics and health checks
  • Framework Integration: Rails and Hanami support with generators
  • ActiveJob Integration: Background job processing for message handling
  • Security: Payload masking and sensitive data protection
  • Health Checks: Processor, queue, and Kafka connectivity monitoring
  • Structured Logging: Context-aware logging with correlation IDs
  • Schema Validation: Message schema registration and validation
  • Retry Logic: Configurable retry policies with exponential backoff

Installation

Add this line to your application's Gemfile:

gem 'pigeon'

And then execute:

$ bundle install

Or install it yourself as:

$ gem install pigeon

Quick Start

Rails Integration

Install Pigeon in your Rails application:

$ rake pigeon:install[rails]

This will create:

  • Configuration file at config/initializers/pigeon.rb
  • Database migration for the outbox messages table

Run the migration:

$ rails db:migrate

Hanami Integration

For Hanami applications, generate the migration:

$ rake pigeon:install[hanami]

Configuration

Configure the gem with your Kafka and application details:

Pigeon.configure do |config|
  # Basic configuration
  config.client_id = "my-application"
  config.kafka_brokers = ["kafka1:9092", "kafka2:9092"]

  # Retry configuration
  config.max_retries = 5
  config.retry_delay = 60 # seconds
  config.max_retry_delay = 3600 # 1 hour

  # Encryption (optional)
  config.encrypt_payload = false
  config.encryption_key = ENV["PIGEON_ENCRYPTION_KEY"]

  # Retention and cleanup
  config.retention_period = 7 # days

  # Karafka-specific configuration
  config.karafka_config = {
    delivery: :async,
    kafka: {
      'bootstrap.servers': 'kafka1:9092,kafka2:9092',
      'request.required.acks': 1
    }
  }
end

Environment Variables

Pigeon supports configuration via environment variables:

PIGEON_CLIENT_ID=my-app
PIGEON_KAFKA_BROKERS=kafka1:9092,kafka2:9092
PIGEON_ENCRYPTION_KEY=your-32-byte-encryption-key
PIGEON_MAX_RETRIES=5
PIGEON_RETRY_DELAY=60

Usage

Publishing Messages

# Simple publishing
Pigeon.publisher.publish(
  topic: "user-events",
  payload: { user_id: 123, action: "signup" }
)

# With encryption and sensitive field masking
Pigeon.publisher.publish(
  topic: "user-events",
  payload: { user_id: 123, email: "[email protected]", password: "secret" },
  key: "user-123",
  headers: { "source" => "web-app" },
  encrypt: true,
  sensitive_fields: [:password, :credit_card]
)

# Synchronous publishing (immediate attempt)
Pigeon.publisher.publish(
  topic: "critical-events",
  payload: { event: "payment_processed" },
  sync: true
)

Processing Messages

# Process pending messages
stats = Pigeon.processor.process_pending(batch_size: 100)
puts "Processed: #{stats[:processed]}, Succeeded: #{stats[:succeeded]}, Failed: #{stats[:failed]}"

# Process a specific message
success = Pigeon.processor.process_message("message-id")

# Clean up old processed messages
cleaned = Pigeon.processor.cleanup_processed(older_than: 14) # days
puts "Cleaned up #{cleaned} messages"

# Start background processing
Pigeon.start_processing(batch_size: 100, interval: 5, thread_count: 2)

# Stop background processing
Pigeon.stop_processing

Encryption

# Enable encryption in configuration
Pigeon.configure do |config|
  config.encrypt_payload = true
  config.encryption_key = "your-32-byte-encryption-key"
end

# Encrypt a payload
encrypted = Pigeon.encrypt(payload.to_json)
decrypted = Pigeon.decrypt(encrypted)

# Mask sensitive fields for logging
masked = Pigeon.mask_payload(payload, [:password, :credit_card])

Tracing

# Initialize OpenTelemetry tracing
Pigeon.init_tracing(service_name: "my-app")

# Trace message publishing
Pigeon.with_span("publish_message", attributes: { topic: "user-events" }) do
  Pigeon.publisher.publish(topic: "user-events", payload: data)
end

# Trace message processing
Pigeon.trace_process(message) do |span|
  # Process the message
  span.set_attribute("message.id", message.id)
end

Health Checks

# Check overall health
health = Pigeon.health_status
puts "Status: #{health[:status]}"

# Check individual components
processor_health = Pigeon.processor_health
queue_health = Pigeon.queue_health
kafka_health = Pigeon.kafka_health

Monitoring

# Get metrics
metrics = Pigeon.metrics
puts "Messages published: #{metrics[:messages_published]}"
puts "Messages processed: #{metrics[:messages_processed]}"

# Get monitoring-friendly metrics
monitoring_metrics = Pigeon.metrics_for_monitoring

# Reset metrics
Pigeon.reset_metrics

Schema Validation

# Register a schema
Pigeon.register_schema("user_event", {
  type: "object",
  properties: {
    user_id: { type: "integer" },
    action: { type: "string" }
  },
  required: ["user_id", "action"]
})

# Register sensitive fields
Pigeon.register_sensitive_fields([:password, :credit_card, :ssn])

ActiveJob Integration

# Process messages in background
Pigeon::ActiveJobIntegration::ProcessorJob.perform_later(100)

# Clean up old messages in background
Pigeon::ActiveJobIntegration::CleanupJob.perform_later(7)

API Reference

Core API

# Configuration
Pigeon.configure(&block)
Pigeon.config

# Publishing
Pigeon.publisher.publish(topic:, payload:, **options)

# Processing
Pigeon.processor.process_pending(batch_size: 100)
Pigeon.processor.process_message(id)
Pigeon.processor.cleanup_processed(older_than: 7)

# Background processing
Pigeon.start_processing(batch_size: 100, interval: 5, thread_count: 2)
Pigeon.stop_processing
Pigeon.processing?

Security API

# Encryption
Pigeon.encrypt(payload, encryption_key = nil)
Pigeon.decrypt(encrypted_payload, encryption_key = nil)
Pigeon.mask_payload(payload, sensitive_fields = nil)

Monitoring API

# Metrics
Pigeon.metrics
Pigeon.metrics_for_monitoring
Pigeon.reset_metrics

# Health checks
Pigeon.health_status
Pigeon.processor_health
Pigeon.queue_health
Pigeon.kafka_health

# Logging
Pigeon.logger_with_context(context = {})
Pigeon.log_level = :info

Tracing API

# Tracing setup
Pigeon.init_tracing(service_name: "my-app", exporter: nil)
Pigeon.tracing_available?

# Spans
Pigeon.with_span(name, attributes: {}, kind: :internal, &block)
Pigeon.tracer(name = "pigeon")

Outbox API

# Message management
Pigeon.create_outbox_message(attributes = {})
Pigeon.find_outbox_message(id)
Pigeon.find_outbox_messages_by_status(status, limit = 100)
Pigeon.find_outbox_messages_ready_for_retry(limit = 100)
Pigeon.count_outbox_messages_by_status(status)
Pigeon.find_oldest_outbox_message_by_status(status)

Examples

See the examples/ directory for complete working examples:

  • encryption_example.rb - Demonstrates AES-256-GCM encryption and payload masking

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

Contributing

Bug reports and pull requests are welcome on GitHub.

License

The gem is available as open source under the terms of the MIT License.