Class: Pigeon::Models::Adapters::ActiveRecordAdapter

Inherits:
OutboxMessage
  • Object
show all
Defined in:
lib/pigeon/models/adapters/active_record_adapter.rb

Overview

ActiveRecord adapter for OutboxMessage

Constant Summary

Constants inherited from OutboxMessage

OutboxMessage::ATTRIBUTES, OutboxMessage::DEFAULTS, OutboxMessage::STATUSES

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OutboxMessage

#[], #[]=, #attributes, #calculate_next_retry_time, #increment_retry_count, #initialize, #mark_as_failed, #mark_as_published, #max_retries_exceeded?, #update

Constructor Details

This class inherits a constructor from Pigeon::Models::OutboxMessage

Class Method Details

.count_by_status(status) ⇒ Integer

Count messages by status

Parameters:

  • status (String)

    Message status

Returns:

  • (Integer)

    Count of messages with the given status



76
77
78
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 76

def self.count_by_status(status)
  model.where(status: status).count
end

.create(attributes = {}) ⇒ OutboxMessage

Create a new outbox message

Parameters:

  • attributes (Hash) (defaults to: {})

    Message attributes

Returns:



39
40
41
42
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 39

def self.create(attributes = {})
  record = model.create!(prepare_attributes(attributes))
  new_from_record(record)
end

.define_modelObject

Define the ActiveRecord model



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 9

def self.define_model
  return @model if @model

  # Define the ActiveRecord model class
  @model = Class.new(ActiveRecord::Base) do
    self.table_name = "outbox_messages"

    # Validations
    validates :topic, presence: true
    validates :payload, presence: true
    validates :status, presence: true, inclusion: { in: Pigeon::Models::OutboxMessage::STATUSES }
    validates :retry_count, presence: true, numericality: { only_integer: true, greater_than_or_equal_to: 0 }

    # Serialize headers as JSON
    serialize :headers, JSON if respond_to?(:serialize)
  end

  # Return the model class
  @model
end

.find(id) ⇒ OutboxMessage?

Find a message by ID

Parameters:

  • id (String, Integer)

    Message ID

Returns:



47
48
49
50
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 47

def self.find(id)
  record = model.find_by(id: id)
  record ? new_from_record(record) : nil
end

.find_by_status(status, limit = 100) ⇒ Array<OutboxMessage>

Find messages by status

Parameters:

  • status (String)

    Message status

  • limit (Integer) (defaults to: 100)

    Maximum number of messages to return

Returns:



56
57
58
59
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 56

def self.find_by_status(status, limit = 100)
  records = model.where(status: status).order(created_at: :asc).limit(limit)
  records.map { |record| new_from_record(record) }
end

.find_oldest_by_status(status) ⇒ OutboxMessage?

Find the oldest message by status

Parameters:

  • status (String)

    Message status

Returns:



83
84
85
86
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 83

def self.find_oldest_by_status(status)
  record = model.where(status: status).order(created_at: :asc).first
  record ? new_from_record(record) : nil
end

.find_ready_for_retry(limit = 100) ⇒ Array<OutboxMessage>

Find messages ready for retry

Parameters:

  • limit (Integer) (defaults to: 100)

    Maximum number of messages to return

Returns:



64
65
66
67
68
69
70
71
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 64

def self.find_ready_for_retry(limit = 100)
  now = Time.now
  records = model.where(status: "pending")
                 .where("next_retry_at IS NULL OR next_retry_at <= ?", now)
                 .order(created_at: :asc)
                 .limit(limit)
  records.map { |record| new_from_record(record) }
end

.modelClass

Get the ActiveRecord model class

Returns:

  • (Class)

    ActiveRecord model class



32
33
34
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 32

def self.model
  define_model
end

.new_from_record(record) ⇒ OutboxMessage

Create a new OutboxMessage instance from an ActiveRecord record

Parameters:

  • record (ActiveRecord::Base)

    ActiveRecord record

Returns:



91
92
93
94
95
96
97
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 91

def self.new_from_record(record)
  attributes = {}
  ATTRIBUTES.each do |attr|
    attributes[attr] = record.send(attr) if record.respond_to?(attr)
  end
  new(attributes).tap { |msg| msg.instance_variable_set(:@record, record) }
end

.prepare_attributes(attributes) ⇒ Hash

Prepare attributes for ActiveRecord

Parameters:

  • attributes (Hash)

    Raw attributes

Returns:

  • (Hash)

    Prepared attributes



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 102

def self.prepare_attributes(attributes)
  attributes = attributes.dup

  # Convert Time objects to the format expected by ActiveRecord
  %i[created_at updated_at published_at next_retry_at].each do |attr|
    attributes[attr] = attributes[attr].to_time if attributes[attr].is_a?(Time)
  end

  # Ensure headers is a hash
  attributes[:headers] ||= {}

  attributes
end

Instance Method Details

#saveBoolean

Save the message

Returns:

  • (Boolean)

    Whether the save was successful



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 118

def save
  if @record
    @record.attributes = self.class.prepare_attributes(@attributes)
    @record.save
  else
    @record = self.class.model.create!(self.class.prepare_attributes(@attributes))
    true
  end
rescue ActiveRecord::RecordInvalid => e
  Pigeon.config.logger.error("Failed to save outbox message: #{e.message}")
  false
end