Class: Pigeon::Processor
- Inherits:
-
Object
- Object
- Pigeon::Processor
- Defined in:
- lib/pigeon/processor.rb
Overview
Processor class for handling outbox messages
Instance Method Summary collapse
-
#cleanup_processed(older_than: 7) ⇒ Integer
Clean up old processed messages.
-
#process_message(message_id) ⇒ Boolean
Process a specific outbox message.
-
#process_pending(batch_size: 100) ⇒ Hash
Process pending outbox messages.
Instance Method Details
#cleanup_processed(older_than: 7) ⇒ Integer
Clean up old processed messages
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/pigeon/processor.rb', line 62 def cleanup_processed(older_than: 7) # This is a placeholder implementation # The actual implementation will be added in task 7.2 Pigeon.config.logger.info("Cleaning up processed messages older than #{older_than} days") # In the actual implementation, we would: # 1. Delete processed messages older than the specified threshold # 2. Return the number of deleted records # Return mock count 0 end |
#process_message(message_id) ⇒ Boolean
Process a specific outbox message
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/pigeon/processor.rb', line 33 def () # This is a placeholder implementation # The actual implementation will be added in task 4.1 Pigeon.config.logger.info("Processing message ID: #{}") # In the actual implementation, we would: # 1. Fetch the message from the database # 2. Send it to Kafka using Karafka # 3. Update its status in the database # Mock implementation using Karafka begin # Simulate sending a message with Karafka Pigeon.karafka_producer.produce_async( { test: "data" }.to_json, topic: "test-topic" ) # Return success true rescue StandardError => e Pigeon.config.logger.error("Failed to process message #{}: #{e.}") false end end |
#process_pending(batch_size: 100) ⇒ Hash
Process pending outbox messages
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/pigeon/processor.rb', line 11 def process_pending(batch_size: 100) # This is a placeholder implementation # The actual implementation will be added in task 4.1 Pigeon.config.logger.info("Processing pending messages, batch size: #{batch_size}") # In the actual implementation, we would: # 1. Fetch pending messages from the database # 2. Process them in batches using Karafka # 3. Update their status in the database # Return mock statistics { processed: 0, succeeded: 0, failed: 0, retried: 0 } end |