Class: Pigeon::Models::OutboxMessage

Inherits:
Object
  • Object
show all
Defined in:
lib/pigeon/models/outbox_message.rb

Overview

Base class for outbox message model This is a framework-agnostic representation of the outbox message

Constant Summary collapse

ATTRIBUTES =

Attributes that should be present in all framework implementations

i[
  id
  topic
  key
  headers
  partition
  payload
  status
  retry_count
  max_retries
  error_message
  correlation_id
  created_at
  updated_at
  published_at
  next_retry_at
].freeze
STATUSES =

Valid status values

%w[pending processing published failed].freeze
DEFAULTS =

Default values for attributes

{
  status: "pending",
  retry_count: 0,
  headers: {},
  created_at: -> { Time.now },
  updated_at: -> { Time.now }
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attributes = {}) ⇒ OutboxMessage

Initialize a new outbox message

Parameters:

  • attributes (Hash) (defaults to: {})

    Message attributes



84
85
86
87
88
89
# File 'lib/pigeon/models/outbox_message.rb', line 84

def initialize(attributes = {})
  @attributes = DEFAULTS.dup
  attributes.each do |key, value|
    send("#{key}=", value) if respond_to?("#{key}=")
  end
end

Class Method Details

.count_by_status(status) ⇒ Integer

Count messages by status

Parameters:

  • status (String)

    Message status

Returns:

  • (Integer)

    Count of messages with the given status

Raises:

  • (NotImplementedError)


71
72
73
# File 'lib/pigeon/models/outbox_message.rb', line 71

def self.count_by_status(status)
  raise NotImplementedError, "#{self.class.name}#count_by_status must be implemented by a framework adapter"
end

.create(attributes = {}) ⇒ OutboxMessage

Create a new outbox message with the given attributes

Parameters:

  • attributes (Hash) (defaults to: {})

    Message attributes

Returns:



42
43
44
# File 'lib/pigeon/models/outbox_message.rb', line 42

def self.create(attributes = {})
  new(attributes)
end

.find(id) ⇒ OutboxMessage?

Find a message by ID

Parameters:

  • id (String, Integer)

    Message ID

Returns:

Raises:

  • (NotImplementedError)


49
50
51
# File 'lib/pigeon/models/outbox_message.rb', line 49

def self.find(id)
  raise NotImplementedError, "#{self.class.name}#find must be implemented by a framework adapter"
end

.find_by_status(status, limit = 100) ⇒ Array<OutboxMessage>

Find messages by status

Parameters:

  • status (String)

    Message status

  • limit (Integer) (defaults to: 100)

    Maximum number of messages to return

Returns:

Raises:

  • (NotImplementedError)


57
58
59
# File 'lib/pigeon/models/outbox_message.rb', line 57

def self.find_by_status(status, limit = 100)
  raise NotImplementedError, "#{self.class.name}#find_by_status must be implemented by a framework adapter"
end

.find_oldest_by_status(status) ⇒ OutboxMessage?

Find the oldest message by status

Parameters:

  • status (String)

    Message status

Returns:

Raises:

  • (NotImplementedError)


78
79
80
# File 'lib/pigeon/models/outbox_message.rb', line 78

def self.find_oldest_by_status(status)
  raise NotImplementedError, "#{self.class.name}#find_oldest_by_status must be implemented by a framework adapter"
end

.find_ready_for_retry(limit = 100) ⇒ Array<OutboxMessage>

Find messages ready for retry

Parameters:

  • limit (Integer) (defaults to: 100)

    Maximum number of messages to return

Returns:

Raises:

  • (NotImplementedError)


64
65
66
# File 'lib/pigeon/models/outbox_message.rb', line 64

def self.find_ready_for_retry(limit = 100)
  raise NotImplementedError, "#{self.class.name}#find_ready_for_retry must be implemented by a framework adapter"
end

Instance Method Details

#[](name) ⇒ Object

Get an attribute value

Parameters:

  • name (Symbol)

    Attribute name

Returns:

  • (Object)

    Attribute value



94
95
96
# File 'lib/pigeon/models/outbox_message.rb', line 94

def [](name)
  @attributes[name.to_sym]
end

#[]=(name, value) ⇒ Object

Set an attribute value

Parameters:

  • name (Symbol)

    Attribute name

  • value (Object)

    Attribute value



101
102
103
# File 'lib/pigeon/models/outbox_message.rb', line 101

def []=(name, value)
  @attributes[name.to_sym] = value
end

#attributesHash

Get all attributes

Returns:

  • (Hash)

    All attributes



107
108
109
# File 'lib/pigeon/models/outbox_message.rb', line 107

def attributes
  @attributes.dup
end

#calculate_next_retry_timeTime

Calculate the next retry time based on exponential backoff

Returns:

  • (Time)

    Next retry time



157
158
159
160
161
162
163
164
165
166
# File 'lib/pigeon/models/outbox_message.rb', line 157

def calculate_next_retry_time
  base_delay = Pigeon.config.retry_delay || 30 # 30 seconds default
  max_delay = Pigeon.config.max_retry_delay || 86_400 # 24 hours default

  # Exponential backoff: delay = base_delay * (2 ^ retry_count)
  delay = base_delay * (2**retry_count)
  delay = [delay, max_delay].min # Cap at max delay

  Time.now + delay
end

#increment_retry_countBoolean

Increment the retry count and set the next retry time

Returns:

  • (Boolean)

    Whether the update was successful



148
149
150
151
152
153
# File 'lib/pigeon/models/outbox_message.rb', line 148

def increment_retry_count
  self.retry_count += 1
  self.next_retry_at = calculate_next_retry_time
  self.updated_at = Time.now
  save
end

#mark_as_failed(error = nil) ⇒ Boolean

Mark the message as failed

Parameters:

  • error (Exception, String) (defaults to: nil)

    Error that caused the failure

Returns:

  • (Boolean)

    Whether the update was successful



139
140
141
142
143
144
# File 'lib/pigeon/models/outbox_message.rb', line 139

def mark_as_failed(error = nil)
  self.status = "failed"
  self.error_message = error.is_a?(Exception) ? "#{error.class}: #{error.message}" : error.to_s
  self.updated_at = Time.now
  save
end

#mark_as_publishedBoolean

Mark the message as published

Returns:

  • (Boolean)

    Whether the update was successful



129
130
131
132
133
134
# File 'lib/pigeon/models/outbox_message.rb', line 129

def mark_as_published
  self.status = "published"
  self.published_at = Time.now
  self.updated_at = Time.now
  save
end

#max_retries_exceeded?Boolean

Check if the message has exceeded the maximum retry count

Returns:

  • (Boolean)

    Whether the message has exceeded the maximum retry count



170
171
172
173
# File 'lib/pigeon/models/outbox_message.rb', line 170

def max_retries_exceeded?
  max_retries = self[:max_retries] || Pigeon.config.max_retries || 10
  retry_count >= max_retries
end

#saveBoolean

Save the message

Returns:

  • (Boolean)

    Whether the save was successful

Raises:

  • (NotImplementedError)


113
114
115
# File 'lib/pigeon/models/outbox_message.rb', line 113

def save
  raise NotImplementedError, "#{self.class.name}#save must be implemented by a framework adapter"
end

#update(attributes = {}) ⇒ Boolean

Update the message attributes

Parameters:

  • attributes (Hash) (defaults to: {})

    New attribute values

Returns:

  • (Boolean)

    Whether the update was successful



120
121
122
123
124
125
# File 'lib/pigeon/models/outbox_message.rb', line 120

def update(attributes = {})
  attributes.each do |key, value|
    send("#{key}=", value) if respond_to?("#{key}=")
  end
  save
end