Class: Roundhouse::RoundRobinFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/roundhouse/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ RoundRobinFetch

Returns a new instance of RoundRobinFetch.



96
97
# File 'lib/roundhouse/fetch.rb', line 96

def initialize(options = nil)
end

Class Method Details

.bulk_requeue(inprogress, options) ⇒ Object

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/roundhouse/fetch.rb', line 106

def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Roundhouse.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue_id] ||= []
    jobs_to_requeue[unit_of_work.queue_id] << unit_of_work.message
  end

  Roundhouse.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue_id, jobs|
        Roundhouse::Monitor.requeue(conn, queue_id, jobs)
      end
    end
    # REFACTOR NOTE: This has to happen outside the pipelining since
    # we need to read. We can refactor to put this back
    # after converting the Monitor operations as EVAL scripts
    jobs_to_requeue.keys.each do |queue_id|
      Roundhouse::Monitor.push(conn, queue_id)
    end
  end
  Roundhouse.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
  Roundhouse.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

Instance Method Details

#retrieve_workObject



99
100
101
102
# File 'lib/roundhouse/fetch.rb', line 99

def retrieve_work
  work = Roundhouse.redis { |conn| Roundhouse::Monitor.await_next_job(conn) }
  UnitOfWork.new(*work) if work
end