Pigeon
A robust Ruby gem that implements the outbox pattern for Kafka message publishing, ensuring reliable message delivery with enterprise features like encryption, tracing, monitoring, and framework integrations.
Features
- Outbox Pattern: Reliable message delivery with database-backed persistence
- Encryption: AES-256-GCM encryption for sensitive payloads
- Tracing: OpenTelemetry integration for distributed tracing
- Monitoring: Comprehensive metrics and health checks
- Framework Integration: Rails and Hanami support with generators
- ActiveJob Integration: Background job processing for message handling
- Security: Payload masking and sensitive data protection
- Health Checks: Processor, queue, and Kafka connectivity monitoring
- Structured Logging: Context-aware logging with correlation IDs
- Schema Validation: Message schema registration and validation
- Retry Logic: Configurable retry policies with exponential backoff
Installation
Add this line to your application's Gemfile:
gem 'pigeon'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install pigeon
Quick Start
Rails Integration
Install Pigeon in your Rails application:
$ rake pigeon:install[rails]
This will create:
- Configuration file at
config/initializers/pigeon.rb - Database migration for the outbox messages table
Run the migration:
$ rails db:migrate
Hanami Integration
For Hanami applications, generate the migration:
$ rake pigeon:install[hanami]
Configuration
Configure the gem with your Kafka and application details:
Pigeon.configure do |config|
# Basic configuration
config.client_id = "my-application"
config.kafka_brokers = ["kafka1:9092", "kafka2:9092"]
# Retry configuration
config.max_retries = 5
config.retry_delay = 60 # seconds
config.max_retry_delay = 3600 # 1 hour
# Encryption (optional)
config.encrypt_payload = false
config.encryption_key = ENV["PIGEON_ENCRYPTION_KEY"]
# Retention and cleanup
config.retention_period = 7 # days
# Karafka-specific configuration
config.karafka_config = {
delivery: :async,
kafka: {
'bootstrap.servers': 'kafka1:9092,kafka2:9092',
'request.required.acks': 1
}
}
end
Environment Variables
Pigeon supports configuration via environment variables:
PIGEON_CLIENT_ID=my-app
PIGEON_KAFKA_BROKERS=kafka1:9092,kafka2:9092
PIGEON_ENCRYPTION_KEY=your-32-byte-encryption-key
PIGEON_MAX_RETRIES=5
PIGEON_RETRY_DELAY=60
Usage
Publishing Messages
# Simple publishing
Pigeon.publisher.publish(
topic: "user-events",
payload: { user_id: 123, action: "signup" }
)
# With encryption and sensitive field masking
Pigeon.publisher.publish(
topic: "user-events",
payload: { user_id: 123, email: "[email protected]", password: "secret" },
key: "user-123",
headers: { "source" => "web-app" },
encrypt: true,
sensitive_fields: [:password, :credit_card]
)
# Synchronous publishing (immediate attempt)
Pigeon.publisher.publish(
topic: "critical-events",
payload: { event: "payment_processed" },
sync: true
)
Processing Messages
# Process pending messages
stats = Pigeon.processor.process_pending(batch_size: 100)
puts "Processed: #{stats[:processed]}, Succeeded: #{stats[:succeeded]}, Failed: #{stats[:failed]}"
# Process a specific message
success = Pigeon.processor.("message-id")
# Clean up old processed messages
cleaned = Pigeon.processor.cleanup_processed(older_than: 14) # days
puts "Cleaned up #{cleaned} messages"
# Start background processing
Pigeon.start_processing(batch_size: 100, interval: 5, thread_count: 2)
# Stop background processing
Pigeon.stop_processing
Encryption
# Enable encryption in configuration
Pigeon.configure do |config|
config.encrypt_payload = true
config.encryption_key = "your-32-byte-encryption-key"
end
# Encrypt a payload
encrypted = Pigeon.encrypt(payload.to_json)
decrypted = Pigeon.decrypt(encrypted)
# Mask sensitive fields for logging
masked = Pigeon.mask_payload(payload, [:password, :credit_card])
Tracing
# Initialize OpenTelemetry tracing
Pigeon.init_tracing(service_name: "my-app")
# Trace message publishing
Pigeon.with_span("publish_message", attributes: { topic: "user-events" }) do
Pigeon.publisher.publish(topic: "user-events", payload: data)
end
# Trace message processing
Pigeon.trace_process() do |span|
# Process the message
span.set_attribute("message.id", .id)
end
Health Checks
# Check overall health
health = Pigeon.health_status
puts "Status: #{health[:status]}"
# Check individual components
processor_health = Pigeon.processor_health
queue_health = Pigeon.queue_health
kafka_health = Pigeon.kafka_health
Monitoring
# Get metrics
metrics = Pigeon.metrics
puts "Messages published: #{metrics[:messages_published]}"
puts "Messages processed: #{metrics[:messages_processed]}"
# Get monitoring-friendly metrics
monitoring_metrics = Pigeon.metrics_for_monitoring
# Reset metrics
Pigeon.reset_metrics
Schema Validation
# Register a schema
Pigeon.register_schema("user_event", {
type: "object",
properties: {
user_id: { type: "integer" },
action: { type: "string" }
},
required: ["user_id", "action"]
})
# Register sensitive fields
Pigeon.register_sensitive_fields([:password, :credit_card, :ssn])
ActiveJob Integration
# Process messages in background
Pigeon::ActiveJobIntegration::ProcessorJob.perform_later(100)
# Clean up old messages in background
Pigeon::ActiveJobIntegration::CleanupJob.perform_later(7)
API Reference
Core API
# Configuration
Pigeon.configure(&block)
Pigeon.config
# Publishing
Pigeon.publisher.publish(topic:, payload:, **)
# Processing
Pigeon.processor.process_pending(batch_size: 100)
Pigeon.processor.(id)
Pigeon.processor.cleanup_processed(older_than: 7)
# Background processing
Pigeon.start_processing(batch_size: 100, interval: 5, thread_count: 2)
Pigeon.stop_processing
Pigeon.processing?
Security API
# Encryption
Pigeon.encrypt(payload, encryption_key = nil)
Pigeon.decrypt(encrypted_payload, encryption_key = nil)
Pigeon.mask_payload(payload, sensitive_fields = nil)
Monitoring API
# Metrics
Pigeon.metrics
Pigeon.metrics_for_monitoring
Pigeon.reset_metrics
# Health checks
Pigeon.health_status
Pigeon.processor_health
Pigeon.queue_health
Pigeon.kafka_health
# Logging
Pigeon.logger_with_context(context = {})
Pigeon.log_level = :info
Tracing API
# Tracing setup
Pigeon.init_tracing(service_name: "my-app", exporter: nil)
Pigeon.tracing_available?
# Spans
Pigeon.with_span(name, attributes: {}, kind: :internal, &block)
Pigeon.tracer(name = "pigeon")
Outbox API
# Message management
Pigeon.(attributes = {})
Pigeon.(id)
Pigeon.(status, limit = 100)
Pigeon.(limit = 100)
Pigeon.(status)
Pigeon.(status)
Examples
See the examples/ directory for complete working examples:
encryption_example.rb- Demonstrates AES-256-GCM encryption and payload masking
Development
After checking out the repo, run bin/setup to install dependencies. Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.
Contributing
Bug reports and pull requests are welcome on GitHub.
License
The gem is available as open source under the terms of the MIT License.