Class: Fairway::Sidekiq::FairwayFetch

Inherits:
Sidekiq::BasicFetch
  • Object
show all
Defined in:
lib/fairway/sidekiq/fairway_fetch.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues, &block) ⇒ FairwayFetch

Returns a new instance of FairwayFetch.



6
7
8
9
# File 'lib/fairway/sidekiq/fairway_fetch.rb', line 6

def initialize(queues, &block)
  @queues = queues
  @message_to_job = block if block_given?
end

Instance Attribute Details

#queuesObject (readonly)

Returns the value of attribute queues.



4
5
6
# File 'lib/fairway/sidekiq/fairway_fetch.rb', line 4

def queues
  @queues
end

Instance Method Details

#==(other) ⇒ Object



40
41
42
# File 'lib/fairway/sidekiq/fairway_fetch.rb', line 40

def ==(other)
  other.respond_to?(:queues) && queues == other.queues
end

#retrieve_work(options = {}) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/fairway/sidekiq/fairway_fetch.rb', line 11

def retrieve_work(options = {})
  options = { blocking: true }.merge(options)

  ::Sidekiq.logger.debug "#{self.class.name}#retrieve_work"
  unit_of_work = nil

  fairway_queue, work = @queues.pull

  if work
    decoded_work = JSON.parse(work)

    if @message_to_job
      decoded_work = @message_to_job.call(fairway_queue, decoded_work)
      work         = decoded_work.to_json
    end

    unit_of_work = UnitOfWork.new(decoded_work["queue"], work)
  end

  if unit_of_work
    ::Sidekiq.logger.debug "#{self.class.name}#retrieve_work got work"
  else
    ::Sidekiq.logger.debug "#{self.class.name}#retrieve_work got nil"
    sleep 1 if options[:blocking]
  end

  unit_of_work
end