Class: Pigeon::Models::Adapters::RomAdapter
- Inherits:
-
OutboxMessage
- Object
- OutboxMessage
- Pigeon::Models::Adapters::RomAdapter
- Defined in:
- lib/pigeon/models/adapters/rom_adapter.rb
Overview
ROM adapter for OutboxMessage (for Hanami applications)
Constant Summary
Constants inherited from OutboxMessage
OutboxMessage::ATTRIBUTES, OutboxMessage::DEFAULTS, OutboxMessage::STATUSES
Class Method Summary collapse
-
.count_by_status(status) ⇒ Integer
Count messages by status.
-
.create(attributes = {}) ⇒ OutboxMessage
Create a new outbox message.
-
.define_relation ⇒ Object
Define the ROM relation.
-
.find(id) ⇒ OutboxMessage?
Find a message by ID.
-
.find_by_status(status, limit = 100) ⇒ Array<OutboxMessage>
Find messages by status.
-
.find_oldest_by_status(status) ⇒ OutboxMessage?
Find the oldest message by status.
-
.find_ready_for_retry(limit = 100) ⇒ Array<OutboxMessage>
Find messages ready for retry.
-
.new_from_record(record) ⇒ OutboxMessage
Create a new OutboxMessage instance from a ROM record.
-
.prepare_attributes(attributes) ⇒ Hash
Prepare attributes for ROM.
-
.relation ⇒ ROM::Relation
Get the ROM relation.
-
.repository ⇒ ROM::Repository
Get the ROM repository.
Instance Method Summary collapse
-
#save ⇒ Boolean
Save the message.
Methods inherited from OutboxMessage
#[], #[]=, #attributes, #calculate_next_retry_time, #increment_retry_count, #initialize, #mark_as_failed, #mark_as_published, #max_retries_exceeded?, #update
Constructor Details
This class inherits a constructor from Pigeon::Models::OutboxMessage
Class Method Details
.count_by_status(status) ⇒ Integer
Count messages by status
89 90 91 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 89 def self.count_by_status(status) relation.by_status(status).count end |
.create(attributes = {}) ⇒ OutboxMessage
Create a new outbox message
53 54 55 56 57 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 53 def self.create(attributes = {}) attributes = prepare_attributes(attributes) record = repository.create(attributes) new_from_record(record) end |
.define_relation ⇒ Object
Define the ROM relation
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 9 def self.define_relation return @relation if @relation # Get the ROM container container = Hanami.app["persistence.rom"] # Define the relation if it doesn't exist unless container.relations.key?(:outbox_messages) container.register_relation(Class.new(ROM::Relation[:sql]) do schema(:outbox_messages, infer: true) # Query methods def by_status(status) where(status: status).order(:created_at) end def ready_for_retry now = Time.now where(status: "pending") .where { next_retry_at.nil? | (next_retry_at <= now) } .order(:created_at) end end) end # Get the relation @relation = container.relations[:outbox_messages] end |
.find(id) ⇒ OutboxMessage?
Find a message by ID
62 63 64 65 66 67 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 62 def self.find(id) record = repository.find(id) record ? new_from_record(record) : nil rescue ROM::TupleCountMismatchError nil end |
.find_by_status(status, limit = 100) ⇒ Array<OutboxMessage>
Find messages by status
73 74 75 76 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 73 def self.find_by_status(status, limit = 100) records = relation.by_status(status).limit(limit).to_a records.map { |record| new_from_record(record) } end |
.find_oldest_by_status(status) ⇒ OutboxMessage?
Find the oldest message by status
96 97 98 99 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 96 def self.find_oldest_by_status(status) record = relation.by_status(status).order(:created_at).limit(1).one record ? new_from_record(record) : nil end |
.find_ready_for_retry(limit = 100) ⇒ Array<OutboxMessage>
Find messages ready for retry
81 82 83 84 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 81 def self.find_ready_for_retry(limit = 100) records = relation.ready_for_retry.limit(limit).to_a records.map { |record| new_from_record(record) } end |
.new_from_record(record) ⇒ OutboxMessage
Create a new OutboxMessage instance from a ROM record
104 105 106 107 108 109 110 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 104 def self.new_from_record(record) attributes = {} ATTRIBUTES.each do |attr| attributes[attr] = record.send(attr) if record.respond_to?(attr) end new(attributes).tap { |msg| msg.instance_variable_set(:@record, record) } end |
.prepare_attributes(attributes) ⇒ Hash
Prepare attributes for ROM
115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 115 def self.prepare_attributes(attributes) attributes = attributes.dup # Convert Time objects to the format expected by ROM %i[created_at updated_at published_at next_retry_at].each do |attr| attributes[attr] = attributes[attr].to_time if attributes[attr].is_a?(Time) end # Ensure headers is a hash attributes[:headers] ||= {} attributes end |
.relation ⇒ ROM::Relation
Get the ROM relation
40 41 42 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 40 def self.relation define_relation end |
.repository ⇒ ROM::Repository
Get the ROM repository
46 47 48 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 46 def self.repository @repository ||= Hanami.app["repositories.outbox_messages"] end |
Instance Method Details
#save ⇒ Boolean
Save the message
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 131 def save if @record # Update existing record id = @record.id attributes = self.class.prepare_attributes(@attributes) self.class.repository.update(id, attributes) else # Create new record attributes = self.class.prepare_attributes(@attributes) @record = self.class.repository.create(attributes) end true rescue StandardError => e Pigeon.config.logger.error("Failed to save outbox message: #{e.}") false end |