Class: RubyEventStore::Outbox::Repository
- Inherits:
-
Object
- Object
- RubyEventStore::Outbox::Repository
- Defined in:
- lib/ruby_event_store/outbox/repository.rb
Defined Under Namespace
Constant Summary collapse
- RECENTLY_LOCKED_DURATION =
10.minutes
Instance Method Summary collapse
- #delete_enqueued_older_than(fetch_specification, duration, limit) ⇒ Object
-
#initialize(database_url, logger, metrics) ⇒ Repository
constructor
A new instance of Repository.
- #mark_as_enqueued(record, now) ⇒ Object
- #with_next_batch(fetch_specification, batch_size, consumer_uuid, locking, clock, &block) ⇒ Object
Constructor Details
#initialize(database_url, logger, metrics) ⇒ Repository
Returns a new instance of Repository.
116 117 118 119 120 121 122 123 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 116 def initialize(database_url, logger, metrics) @logger = logger @metrics = metrics ::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected? if ::ActiveRecord::Base.connection.adapter_name == "Mysql2" ::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;") end end |
Instance Method Details
#delete_enqueued_older_than(fetch_specification, duration, limit) ⇒ Object
137 138 139 140 141 142 143 144 145 146 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 137 def delete_enqueued_older_than(fetch_specification, duration, limit) scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago) scope = scope.limit(limit).order(:id) unless limit == :all scope.delete_all :ok rescue ::ActiveRecord::Deadlocked :deadlocked rescue ::ActiveRecord::LockWaitTimeout :lock_timeout end |
#mark_as_enqueued(record, now) ⇒ Object
133 134 135 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 133 def mark_as_enqueued(record, now) record.update_column(:enqueued_at, now) end |
#with_next_batch(fetch_specification, batch_size, consumer_uuid, locking, clock, &block) ⇒ Object
125 126 127 128 129 130 131 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 125 def with_next_batch(fetch_specification, batch_size, consumer_uuid, locking, clock, &block) if locking with_next_locking_batch(fetch_specification, batch_size, consumer_uuid, clock, &block) else with_next_non_locking_batch(fetch_specification, batch_size, &block) end end |