Class: RubyEventStore::Outbox::SidekiqProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/outbox/sidekiq_processor.rb

Constant Summary collapse

InvalidPayload =
Class.new(StandardError)

Instance Method Summary collapse

Constructor Details

#initialize(redis) ⇒ SidekiqProcessor

Returns a new instance of SidekiqProcessor.



13
14
15
16
# File 'lib/ruby_event_store/outbox/sidekiq_processor.rb', line 13

def initialize(redis)
  @redis = redis
  @recently_used_queues = Set.new
end

Instance Method Details

#after_batchObject



32
33
34
# File 'lib/ruby_event_store/outbox/sidekiq_processor.rb', line 32

def after_batch
  ensure_that_sidekiq_knows_about_all_queues
end

#message_formatObject



36
37
38
# File 'lib/ruby_event_store/outbox/sidekiq_processor.rb', line 36

def message_format
  SIDEKIQ5_FORMAT
end

#process(record, now) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/ruby_event_store/outbox/sidekiq_processor.rb', line 18

def process(record, now)
  parsed_record = JSON.parse(record.payload)

  queue = parsed_record["queue"]
  raise InvalidPayload.new("Missing queue") if queue.nil? || queue.empty?
  payload = JSON.generate(parsed_record.merge({ "enqueued_at" => record.created_at.to_f }))

  redis.call("LPUSH", "queue:#{queue}", payload)

  @recently_used_queues << queue
rescue RedisClient::TimeoutError, RedisClient::ConnectionError
  raise RetriableRedisError
end