Class: Pigeon::Models::Adapters::ActiveRecordAdapter
- Inherits:
-
OutboxMessage
- Object
- OutboxMessage
- Pigeon::Models::Adapters::ActiveRecordAdapter
- Defined in:
- lib/pigeon/models/adapters/active_record_adapter.rb
Overview
ActiveRecord adapter for OutboxMessage
Constant Summary
Constants inherited from OutboxMessage
OutboxMessage::ATTRIBUTES, OutboxMessage::DEFAULTS, OutboxMessage::STATUSES
Class Method Summary collapse
-
.count_by_status(status) ⇒ Integer
Count messages by status.
-
.create(attributes = {}) ⇒ OutboxMessage
Create a new outbox message.
-
.define_model ⇒ Object
Define the ActiveRecord model.
-
.find(id) ⇒ OutboxMessage?
Find a message by ID.
-
.find_by_status(status, limit = 100) ⇒ Array<OutboxMessage>
Find messages by status.
-
.find_oldest_by_status(status) ⇒ OutboxMessage?
Find the oldest message by status.
-
.find_ready_for_retry(limit = 100) ⇒ Array<OutboxMessage>
Find messages ready for retry.
-
.model ⇒ Class
Get the ActiveRecord model class.
-
.new_from_record(record) ⇒ OutboxMessage
Create a new OutboxMessage instance from an ActiveRecord record.
-
.prepare_attributes(attributes) ⇒ Hash
Prepare attributes for ActiveRecord.
Instance Method Summary collapse
-
#save ⇒ Boolean
Save the message.
Methods inherited from OutboxMessage
#[], #[]=, #attributes, #calculate_next_retry_time, #increment_retry_count, #initialize, #mark_as_failed, #mark_as_published, #max_retries_exceeded?, #update
Constructor Details
This class inherits a constructor from Pigeon::Models::OutboxMessage
Class Method Details
.count_by_status(status) ⇒ Integer
Count messages by status
76 77 78 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 76 def self.count_by_status(status) model.where(status: status).count end |
.create(attributes = {}) ⇒ OutboxMessage
Create a new outbox message
39 40 41 42 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 39 def self.create(attributes = {}) record = model.create!(prepare_attributes(attributes)) new_from_record(record) end |
.define_model ⇒ Object
Define the ActiveRecord model
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 9 def self.define_model return @model if @model # Define the ActiveRecord model class @model = Class.new(ActiveRecord::Base) do self.table_name = "outbox_messages" # Validations validates :topic, presence: true validates :payload, presence: true validates :status, presence: true, inclusion: { in: Pigeon::Models::OutboxMessage::STATUSES } validates :retry_count, presence: true, numericality: { only_integer: true, greater_than_or_equal_to: 0 } # Serialize headers as JSON serialize :headers, JSON if respond_to?(:serialize) end # Return the model class @model end |
.find(id) ⇒ OutboxMessage?
Find a message by ID
47 48 49 50 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 47 def self.find(id) record = model.find_by(id: id) record ? new_from_record(record) : nil end |
.find_by_status(status, limit = 100) ⇒ Array<OutboxMessage>
Find messages by status
56 57 58 59 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 56 def self.find_by_status(status, limit = 100) records = model.where(status: status).order(created_at: :asc).limit(limit) records.map { |record| new_from_record(record) } end |
.find_oldest_by_status(status) ⇒ OutboxMessage?
Find the oldest message by status
83 84 85 86 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 83 def self.find_oldest_by_status(status) record = model.where(status: status).order(created_at: :asc).first record ? new_from_record(record) : nil end |
.find_ready_for_retry(limit = 100) ⇒ Array<OutboxMessage>
Find messages ready for retry
64 65 66 67 68 69 70 71 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 64 def self.find_ready_for_retry(limit = 100) now = Time.now records = model.where(status: "pending") .where("next_retry_at IS NULL OR next_retry_at <= ?", now) .order(created_at: :asc) .limit(limit) records.map { |record| new_from_record(record) } end |
.model ⇒ Class
Get the ActiveRecord model class
32 33 34 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 32 def self.model define_model end |
.new_from_record(record) ⇒ OutboxMessage
Create a new OutboxMessage instance from an ActiveRecord record
91 92 93 94 95 96 97 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 91 def self.new_from_record(record) attributes = {} ATTRIBUTES.each do |attr| attributes[attr] = record.send(attr) if record.respond_to?(attr) end new(attributes).tap { |msg| msg.instance_variable_set(:@record, record) } end |
.prepare_attributes(attributes) ⇒ Hash
Prepare attributes for ActiveRecord
102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 102 def self.prepare_attributes(attributes) attributes = attributes.dup # Convert Time objects to the format expected by ActiveRecord %i[created_at updated_at published_at next_retry_at].each do |attr| attributes[attr] = attributes[attr].to_time if attributes[attr].is_a?(Time) end # Ensure headers is a hash attributes[:headers] ||= {} attributes end |
Instance Method Details
#save ⇒ Boolean
Save the message
118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/pigeon/models/adapters/active_record_adapter.rb', line 118 def save if @record @record.attributes = self.class.prepare_attributes(@attributes) @record.save else @record = self.class.model.create!(self.class.prepare_attributes(@attributes)) true end rescue ActiveRecord::RecordInvalid => e Pigeon.config.logger.error("Failed to save outbox message: #{e.}") false end |