Class: RubyEventStore::Outbox::Repository

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/outbox/repository.rb

Defined Under Namespace

Classes: Lock, Record

Constant Summary collapse

RECENTLY_LOCKED_DURATION =
10.minutes

Instance Method Summary collapse

Constructor Details

#initialize(database_url, logger, metrics) ⇒ Repository

Returns a new instance of Repository.



116
117
118
119
120
121
122
123
# File 'lib/ruby_event_store/outbox/repository.rb', line 116

def initialize(database_url, logger, metrics)
  @logger = logger
  @metrics = metrics
  ::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected?
  if ::ActiveRecord::Base.connection.adapter_name == "Mysql2"
    ::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;")
  end
end

Instance Method Details

#delete_enqueued_older_than(fetch_specification, duration, limit) ⇒ Object



137
138
139
140
141
142
143
144
145
146
# File 'lib/ruby_event_store/outbox/repository.rb', line 137

def delete_enqueued_older_than(fetch_specification, duration, limit)
  scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago)
  scope = scope.limit(limit).order(:id) unless limit == :all
  scope.delete_all
  :ok
rescue ::ActiveRecord::Deadlocked
  :deadlocked
rescue ::ActiveRecord::LockWaitTimeout
  :lock_timeout
end

#mark_as_enqueued(record, now) ⇒ Object



133
134
135
# File 'lib/ruby_event_store/outbox/repository.rb', line 133

def mark_as_enqueued(record, now)
  record.update_column(:enqueued_at, now)
end

#with_next_batch(fetch_specification, batch_size, consumer_uuid, locking, clock, &block) ⇒ Object



125
126
127
128
129
130
131
# File 'lib/ruby_event_store/outbox/repository.rb', line 125

def with_next_batch(fetch_specification, batch_size, consumer_uuid, locking, clock, &block)
  if locking
    with_next_locking_batch(fetch_specification, batch_size, consumer_uuid, clock, &block)
  else
    with_next_non_locking_batch(fetch_specification, batch_size, &block)
  end
end