Class: Fairway::Sidekiq::FairwayFetch
- Inherits:
-
Sidekiq::BasicFetch
- Object
- Sidekiq::BasicFetch
- Fairway::Sidekiq::FairwayFetch
- Defined in:
- lib/fairway/sidekiq/fairway_fetch.rb
Instance Attribute Summary collapse
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
Instance Method Summary collapse
- #==(other) ⇒ Object
-
#initialize(queues, &block) ⇒ FairwayFetch
constructor
A new instance of FairwayFetch.
- #retrieve_work(options = {}) ⇒ Object
Constructor Details
#initialize(queues, &block) ⇒ FairwayFetch
Returns a new instance of FairwayFetch.
6 7 8 9 |
# File 'lib/fairway/sidekiq/fairway_fetch.rb', line 6 def initialize(queues, &block) @queues = queues = block if block_given? end |
Instance Attribute Details
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
4 5 6 |
# File 'lib/fairway/sidekiq/fairway_fetch.rb', line 4 def queues @queues end |
Instance Method Details
#==(other) ⇒ Object
40 41 42 |
# File 'lib/fairway/sidekiq/fairway_fetch.rb', line 40 def ==(other) other.respond_to?(:queues) && queues == other.queues end |
#retrieve_work(options = {}) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/fairway/sidekiq/fairway_fetch.rb', line 11 def retrieve_work( = {}) = { blocking: true }.merge() ::Sidekiq.logger.debug "#{self.class.name}#retrieve_work" unit_of_work = nil fairway_queue, work = @queues.pull if work decoded_work = JSON.parse(work) if decoded_work = .call(fairway_queue, decoded_work) work = decoded_work.to_json end unit_of_work = UnitOfWork.new(decoded_work["queue"], work) end if unit_of_work ::Sidekiq.logger.debug "#{self.class.name}#retrieve_work got work" else ::Sidekiq.logger.debug "#{self.class.name}#retrieve_work got nil" sleep 1 if [:blocking] end unit_of_work end |