Class: Roundhouse::RoundRobinFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/roundhouse/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ RoundRobinFetch

Returns a new instance of RoundRobinFetch.



97
98
99
# File 'lib/roundhouse/fetch.rb', line 97

def initialize(options = nil)
  # ignore options
end

Class Method Details

.bulk_requeue(inprogress, options) ⇒ Object

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/roundhouse/fetch.rb', line 108

def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Roundhouse.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue_id] ||= []
    jobs_to_requeue[unit_of_work.queue_id] << unit_of_work.message
  end

  Roundhouse.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue_id, jobs|
        Roundhouse::Monitor.requeue(conn, queue_id, jobs)
      end
    end
    # REFACTOR NOTE: This has to happen outside the pipelining since
    # we need to read. We can refactor to put this back
    # after converting the Monitor operations as EVAL scripts
    jobs_to_requeue.keys.each do |queue_id|
      Roundhouse::Monitor.push(conn, queue_id)
    end
  end
  Roundhouse.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
  Roundhouse.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

Instance Method Details

#retrieve_workObject



101
102
103
104
# File 'lib/roundhouse/fetch.rb', line 101

def retrieve_work
  work = Roundhouse.redis { |conn| Roundhouse::Monitor.maybe_next_job(conn) }
  UnitOfWork.new(*work) if work
end