Class: Pigeon::Processor
- Inherits:
-
Object
- Object
- Pigeon::Processor
- Includes:
- BackgroundProcessor
- Defined in:
- lib/pigeon/processor.rb,
lib/pigeon/processor/background_processor.rb
Overview
Processor class for handling outbox messages
Defined Under Namespace
Modules: BackgroundProcessor
Instance Method Summary collapse
-
#cleanup_processed(older_than: 7) ⇒ Integer
Clean up old processed messages.
-
#initialize(auto_start: false) ⇒ Processor
constructor
Initialize a new processor.
-
#process_message(message_id) ⇒ Boolean
Process a specific outbox message.
-
#process_pending(batch_size: 100) ⇒ Hash
Process pending outbox messages.
-
#send_to_dead_letter_queue(message_id, dlq_topic: nil) ⇒ Boolean
Send a failed message to a dead letter queue.
-
#start_processing(batch_size: 100, interval: 5, thread_count: 2) ⇒ Boolean
Start processing pending messages in the background.
-
#stop_processing ⇒ Boolean
Stop processing pending messages.
Methods included from BackgroundProcessor
#process_startup_messages, #processing?, #start_background_processing, #stop_background_processing
Constructor Details
#initialize(auto_start: false) ⇒ Processor
Initialize a new processor
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/pigeon/processor.rb', line 15 def initialize(auto_start: false) @mutex = Concurrent::ReentrantReadWriteLock.new @processing = Concurrent::AtomicBoolean.new(false) @thread_pool = nil @metrics = Pigeon.config.metrics_collector || Pigeon::Metrics::Collector.new @start_time = Time.now # Start processing if auto_start is true start_processing if auto_start end |
Instance Method Details
#cleanup_processed(older_than: 7) ⇒ Integer
Clean up old processed messages
99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/pigeon/processor.rb', line 99 def cleanup_processed(older_than: 7) # This is a placeholder implementation # The actual implementation will be added in task 7.2 Pigeon.config.logger.info("Cleaning up processed messages older than #{older_than} days") # In the actual implementation, we would: # 1. Delete processed messages older than the specified threshold # 2. Return the number of deleted records # Return mock count 0 end |
#process_message(message_id) ⇒ Boolean
Process a specific outbox message
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/pigeon/processor.rb', line 73 def () Pigeon.config.logger.info("Processing message ID: #{}") # Find the message = Pigeon.() return false unless # Process the message result = () result[:success] end |
#process_pending(batch_size: 100) ⇒ Hash
Process pending outbox messages
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/pigeon/processor.rb', line 49 def process_pending(batch_size: 100) Pigeon.last_processing_run = Time.now batch_id = SecureRandom.uuid logger = create_batch_logger(batch_id, batch_size) log_batch_start(logger, batch_id, batch_size) start_time = Time.now stats = { processed: 0, succeeded: 0, failed: 0, retried: 0, dead_lettered: 0 } Pigeon::Tracing.trace_batch_process(batch_size) do |span| @mutex.with_read_lock do (stats, batch_size, logger, span) end finalize_batch_processing(stats, start_time, logger, span) end Pigeon.last_successful_processing_run = Time.now if stats[:succeeded].positive? stats end |
#send_to_dead_letter_queue(message_id, dlq_topic: nil) ⇒ Boolean
Send a failed message to a dead letter queue
89 90 91 92 93 94 |
# File 'lib/pigeon/processor.rb', line 89 def send_to_dead_letter_queue(, dlq_topic: nil) = Pigeon.() return false unless (, dlq_topic) end |
#start_processing(batch_size: 100, interval: 5, thread_count: 2) ⇒ Boolean
Start processing pending messages in the background
31 32 33 34 35 36 37 38 |
# File 'lib/pigeon/processor.rb', line 31 def start_processing(batch_size: 100, interval: 5, thread_count: 2) result = start_background_processing(batch_size: batch_size, interval: interval, thread_count: thread_count) # Record processor start time Pigeon.processor_start_time = Time.now if result result end |
#stop_processing ⇒ Boolean
Stop processing pending messages
42 43 44 |
# File 'lib/pigeon/processor.rb', line 42 def stop_processing stop_background_processing end |