Class: Pigeon::Models::Adapters::RomAdapter

Inherits:
OutboxMessage show all
Defined in:
lib/pigeon/models/adapters/rom_adapter.rb

Overview

ROM adapter for OutboxMessage (for Hanami applications)

Constant Summary

Constants inherited from OutboxMessage

OutboxMessage::ATTRIBUTES, OutboxMessage::DEFAULTS, OutboxMessage::STATUSES

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OutboxMessage

#[], #[]=, #attributes, #calculate_next_retry_time, #increment_retry_count, #initialize, #mark_as_failed, #mark_as_published, #max_retries_exceeded?, #update

Constructor Details

This class inherits a constructor from Pigeon::Models::OutboxMessage

Class Method Details

.create(attributes = {}) ⇒ OutboxMessage

Create a new outbox message

Parameters:

  • attributes (Hash) (defaults to: {})

    Message attributes

Returns:



53
54
55
56
57
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 53

def self.create(attributes = {})
  attributes = prepare_attributes(attributes)
  record = repository.create(attributes)
  new_from_record(record)
end

.define_relationObject

Define the ROM relation



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 9

def self.define_relation
  return @relation if @relation

  # Get the ROM container
  container = Hanami.app["persistence.rom"]

  # Define the relation if it doesn't exist
  unless container.relations.key?(:outbox_messages)
    container.register_relation(Class.new(ROM::Relation[:sql]) do
      schema(:outbox_messages, infer: true)

      # Query methods
      def by_status(status)
        where(status: status).order(:created_at)
      end

      def ready_for_retry
        now = Time.now
        where(status: "pending")
          .where { next_retry_at.nil? | (next_retry_at <= now) }
          .order(:created_at)
      end
    end)
  end

  # Get the relation
  @relation = container.relations[:outbox_messages]
end

.find(id) ⇒ OutboxMessage?

Find a message by ID

Parameters:

  • id (String, Integer)

    Message ID

Returns:



62
63
64
65
66
67
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 62

def self.find(id)
  record = repository.find(id)
  record ? new_from_record(record) : nil
rescue ROM::TupleCountMismatchError
  nil
end

.find_by_status(status, limit = 100) ⇒ Array<OutboxMessage>

Find messages by status

Parameters:

  • status (String)

    Message status

  • limit (Integer) (defaults to: 100)

    Maximum number of messages to return

Returns:



73
74
75
76
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 73

def self.find_by_status(status, limit = 100)
  records = relation.by_status(status).limit(limit).to_a
  records.map { |record| new_from_record(record) }
end

.find_ready_for_retry(limit = 100) ⇒ Array<OutboxMessage>

Find messages ready for retry

Parameters:

  • limit (Integer) (defaults to: 100)

    Maximum number of messages to return

Returns:



81
82
83
84
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 81

def self.find_ready_for_retry(limit = 100)
  records = relation.ready_for_retry.limit(limit).to_a
  records.map { |record| new_from_record(record) }
end

.new_from_record(record) ⇒ OutboxMessage

Create a new OutboxMessage instance from a ROM record

Parameters:

  • record (ROM::Struct)

    ROM record

Returns:



89
90
91
92
93
94
95
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 89

def self.new_from_record(record)
  attributes = {}
  ATTRIBUTES.each do |attr|
    attributes[attr] = record.send(attr) if record.respond_to?(attr)
  end
  new(attributes).tap { |msg| msg.instance_variable_set(:@record, record) }
end

.prepare_attributes(attributes) ⇒ Hash

Prepare attributes for ROM

Parameters:

  • attributes (Hash)

    Raw attributes

Returns:

  • (Hash)

    Prepared attributes



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 100

def self.prepare_attributes(attributes)
  attributes = attributes.dup

  # Convert Time objects to the format expected by ROM
  %i[created_at updated_at published_at next_retry_at].each do |attr|
    attributes[attr] = attributes[attr].to_time if attributes[attr].is_a?(Time)
  end

  # Ensure headers is a hash
  attributes[:headers] ||= {}

  attributes
end

.relationROM::Relation

Get the ROM relation

Returns:

  • (ROM::Relation)

    ROM relation



40
41
42
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 40

def self.relation
  define_relation
end

.repositoryROM::Repository

Get the ROM repository

Returns:

  • (ROM::Repository)

    ROM repository



46
47
48
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 46

def self.repository
  @repository ||= Hanami.app["repositories.outbox_messages"]
end

Instance Method Details

#saveBoolean

Save the message

Returns:

  • (Boolean)

    Whether the save was successful



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/pigeon/models/adapters/rom_adapter.rb', line 116

def save
  if @record
    # Update existing record
    id = @record.id
    attributes = self.class.prepare_attributes(@attributes)
    self.class.repository.update(id, attributes)
  else
    # Create new record
    attributes = self.class.prepare_attributes(@attributes)
    @record = self.class.repository.create(attributes)
  end
  true
rescue StandardError => e
  Pigeon.config.logger.error("Failed to save outbox message: #{e.message}")
  false
end