Class: Pigeon::Processor

Inherits:
Object
  • Object
show all
Includes:
BackgroundProcessor
Defined in:
lib/pigeon/processor.rb,
lib/pigeon/processor/background_processor.rb

Overview

Processor class for handling outbox messages

Defined Under Namespace

Modules: BackgroundProcessor

Instance Method Summary collapse

Methods included from BackgroundProcessor

#process_startup_messages, #processing?, #start_background_processing, #stop_background_processing

Constructor Details

#initialize(auto_start: false) ⇒ Processor

Initialize a new processor

Parameters:

  • auto_start (Boolean) (defaults to: false)

    Whether to automatically start processing pending messages on initialization



15
16
17
18
19
20
21
22
23
24
# File 'lib/pigeon/processor.rb', line 15

def initialize(auto_start: false)
  @mutex = Concurrent::ReentrantReadWriteLock.new
  @processing = Concurrent::AtomicBoolean.new(false)
  @thread_pool = nil
  @metrics = Pigeon.config.metrics_collector || Pigeon::Metrics::Collector.new
  @start_time = Time.now

  # Start processing if auto_start is true
  start_processing if auto_start
end

Instance Method Details

#cleanup_processed(older_than: 7) ⇒ Integer

Clean up old processed messages

Parameters:

  • older_than (ActiveSupport::Duration, Integer) (defaults to: 7)

    Age threshold for cleanup

Returns:

  • (Integer)

    Number of records cleaned up



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/pigeon/processor.rb', line 99

def cleanup_processed(older_than: 7)
  # This is a placeholder implementation
  # The actual implementation will be added in task 7.2
  Pigeon.config.logger.info("Cleaning up processed messages older than #{older_than} days")

  # In the actual implementation, we would:
  # 1. Delete processed messages older than the specified threshold
  # 2. Return the number of deleted records

  # Return mock count
  0
end

#process_message(message_id) ⇒ Boolean

Process a specific outbox message

Parameters:

  • message_id (String, Integer)

    ID of the message to process

Returns:

  • (Boolean)

    Success status



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/pigeon/processor.rb', line 73

def process_message(message_id)
  Pigeon.config.logger.info("Processing message ID: #{message_id}")

  # Find the message
  message = Pigeon.find_outbox_message(message_id)
  return false unless message

  # Process the message
  result = process_single_message(message)
  result[:success]
end

#process_pending(batch_size: 100) ⇒ Hash

Process pending outbox messages

Parameters:

  • batch_size (Integer) (defaults to: 100)

    Number of messages to process in one batch

Returns:

  • (Hash)

    Processing statistics



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/pigeon/processor.rb', line 49

def process_pending(batch_size: 100)
  Pigeon.last_processing_run = Time.now
  batch_id = SecureRandom.uuid
  logger = create_batch_logger(batch_id, batch_size)
  log_batch_start(logger, batch_id, batch_size)
  start_time = Time.now

  stats = { processed: 0, succeeded: 0, failed: 0, retried: 0, dead_lettered: 0 }

  Pigeon::Tracing.trace_batch_process(batch_size) do |span|
    @mutex.with_read_lock do
      process_pending_and_retry_messages(stats, batch_size, logger, span)
    end

    finalize_batch_processing(stats, start_time, logger, span)
  end

  Pigeon.last_successful_processing_run = Time.now if stats[:succeeded].positive?
  stats
end

#send_to_dead_letter_queue(message_id, dlq_topic: nil) ⇒ Boolean

Send a failed message to a dead letter queue

Parameters:

  • message_id (String, Integer)

    ID of the message to send to DLQ

  • dlq_topic (String, nil) (defaults to: nil)

    Optional dead letter queue topic (defaults to original topic with .dlq suffix)

Returns:

  • (Boolean)

    Success status



89
90
91
92
93
94
# File 'lib/pigeon/processor.rb', line 89

def send_to_dead_letter_queue(message_id, dlq_topic: nil)
  message = Pigeon.find_outbox_message(message_id)
  return false unless message

  send_message_to_dlq(message, dlq_topic)
end

#start_processing(batch_size: 100, interval: 5, thread_count: 2) ⇒ Boolean

Start processing pending messages in the background

Parameters:

  • batch_size (Integer) (defaults to: 100)

    Number of messages to process in one batch

  • interval (Integer) (defaults to: 5)

    Interval in seconds between processing batches

  • thread_count (Integer) (defaults to: 2)

    Number of threads to use for processing

Returns:

  • (Boolean)

    Whether processing was started



31
32
33
34
35
36
37
38
# File 'lib/pigeon/processor.rb', line 31

def start_processing(batch_size: 100, interval: 5, thread_count: 2)
  result = start_background_processing(batch_size: batch_size, interval: interval, thread_count: thread_count)

  # Record processor start time
  Pigeon.processor_start_time = Time.now if result

  result
end

#stop_processingBoolean

Stop processing pending messages

Returns:

  • (Boolean)

    Whether processing was stopped



42
43
44
# File 'lib/pigeon/processor.rb', line 42

def stop_processing
  stop_background_processing
end