Class: Resque::Plugins::Fifo::Queue::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/resque/plugins/fifo/queue/manager.rb

Constant Summary collapse

DLM_TTL =
30000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_prefix = 'fifo') ⇒ Manager

Returns a new instance of Manager.



11
12
13
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 11

def initialize(queue_prefix = 'fifo')
  @queue_prefix = queue_prefix
end

Instance Attribute Details

#queue_prefixObject

Returns the value of attribute queue_prefix.



9
10
11
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 9

def queue_prefix
  @queue_prefix
end

Class Method Details

.enqueue_to(key, klass, *args) ⇒ Object



112
113
114
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 112

def self.enqueue_to(key, klass, *args)
  enqueue_topic('fifo', key, klass, *args)
end

.enqueue_topic(topic, key, klass, *args) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 116

def self.enqueue_topic(topic, key, klass, *args)
  # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
  before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
    klass.send(hook, *args)
  end

  return nil if before_hooks.any? { |result| result == false }

  manager = Resque::Plugins::Fifo::Queue::Manager.new(topic)
  manager.enqueue(key, klass, *args)

  Plugin.after_enqueue_hooks(klass).each do |hook|
    klass.send(hook, *args)
  end

  return true
end

Instance Method Details

#all_statsObject



103
104
105
106
107
108
109
110
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 103

def all_stats
  {
    dht_times_rehashed: dht_times_rehashed,
    avg_delay: get_stats_avg_delay,
    avg_dht_recalc: get_stats_avg_dht_recalc,
    max_delay: get_stats_max_delay
  }
end

#clear_statsObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 64

def clear_stats
  redis_client.del "fifo-stats-max-delay"
  redis_client.del "fifo-stats-accumulated-delay"
  redis_client.del "fifo-stats-accumulated-count"
  redis_client.del "fifo-stats-dht-rehash"
  redis_client.del "fifo-stats-accumulated-recalc-time"
  redis_client.del "fifo-stats-accumulated-recalc-count"

  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.each_with_index.collect do |slot, index|
    slice, queue = slot.split('#')
    redis_client.del "queue-stats-#{queue}"
  end
end

#compute_queue_name(key) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 27

def compute_queue_name(key)
  index = compute_index(key)
  slots = redis_client.lrange fifo_hash_table_name, 0, -1

  return pending_queue_name if slots.empty?

  slots.reverse.each do |slot|
    slice, queue = slot.split('#')
    if index > slice.to_i
      return queue
    end
  end

  _slice, queue_name = slots.last.split('#')

  queue_name
end

#dht_times_rehashedObject



99
100
101
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 99

def dht_times_rehashed
  redis_client.get("fifo-stats-dht-rehash") || 0
end

#dump_dhtObject



134
135
136
137
138
139
140
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 134

def dump_dht
  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.each_with_index.collect do |slot, index|
    slice, queue = slot.split('#')
    [slice.to_i, queue]
  end
end

#dump_queue_namesObject



158
159
160
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 158

def dump_queue_names
  dump_dht.collect { |item| item[1] }
end

#dump_queuesObject



170
171
172
173
174
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 170

def dump_queues
  query_available_queues.collect do |queue|
    [queue, Resque.peek(queue,0,0)]
  end.to_h
end

#dump_queues_sortedObject



215
216
217
218
219
220
221
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 215

def dump_queues_sorted
  queues = dump_queues
  dht = dump_dht.collect do |item|
    _slice, queue_name = item
    queues[queue_name]
  end
end

#dump_queues_with_slicesObject



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 187

def dump_queues_with_slices
  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.collect do |slot, index|
    slice, queue = slot.split('#')
    worker = worker_for_queue(queue)

    hostname = '?'
    status = '?'
    pid = '?'
    started = '?'
    heartbeat = '?'

    if worker
      hostname = worker.hostname
      status = worker.paused? ? 'paused' : worker.state.to_s
      pid = worker.pid
      started = worker.started
      heartbeat = worker.heartbeat
    end

    [slice, queue, hostname, pid, status, started, heartbeat, get_processed_count(queue), Resque.peek(queue,0,0).size ]
  end
end

#enqueue(key, klass, *args) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 45

def enqueue(key, klass, *args)
  queue = compute_queue_name(key)

  redis_client.incr "queue-stats-#{queue}"
  Resque.validate(klass, queue)
  if Resque.inline? && inline?
    # Instantiating a Resque::Job and calling perform on it so callbacks run
    # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job
    Resque::Job.new(:inline, {'class' => klass, 'args' => Resque.decode(Resque.encode(args)), 'fifo_key' => key, 'enqueue_ts' => 0}).perform
  else
    Resque.push(queue, :class => klass.to_s, :args => args, fifo_key: key, :enqueue_ts => Time.now.to_i)
  end
end

#fifo_hash_table_nameObject



15
16
17
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 15

def fifo_hash_table_name
  "fifo-queue-lookup-#{@queue_prefix}"
end

#get_processed_count(queue) ⇒ Object



211
212
213
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 211

def get_processed_count(queue)
  redis_client.get("queue-stats-#{queue}") || 0
end

#get_stats_avg_delayObject



91
92
93
94
95
96
97
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 91

def get_stats_avg_delay
  accumulated_delay = redis_client.get("fifo-stats-accumulated-delay") || 0
  total_items = redis_client.get("fifo-stats-accumulated-count") || 0
  return 0 if total_items == 0

  return accumulated_delay.to_f / total_items.to_f
end

#get_stats_avg_dht_recalcObject



83
84
85
86
87
88
89
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 83

def get_stats_avg_dht_recalc
  accumulated_delay = redis_client.get("fifo-stats-accumulated-recalc-time") || 0
  total_items = redis_client.get("fifo-stats-accumulated-recalc-count") || 0
  return 0 if total_items == 0

  return accumulated_delay.to_f / total_items.to_f
end

#get_stats_max_delayObject



79
80
81
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 79

def get_stats_max_delay
  redis_client.get("fifo-stats-max-delay") || 0
end

#inline?Boolean

method for stubbing in tests

Returns:

  • (Boolean)


60
61
62
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 60

def inline?
  Resque.inline?
end

#orphaned_queuesObject



264
265
266
267
268
269
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 264

def orphaned_queues
  current_queues = dump_queue_names
  Resque.all_queues.reject do |queue|
    !queue.start_with?(queue_prefix) || current_queues.include?(queue)
  end
end

#peek_pendingObject



150
151
152
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 150

def peek_pending
  Resque.peek(pending_queue_name, 0, 0)
end

#pending_queue_nameObject



23
24
25
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 23

def pending_queue_name
  "#{queue_prefix}-pending"
end

#pending_totalObject



154
155
156
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 154

def pending_total
  redis_client.llen "queue:#{pending_queue_name}"
end

#pretty_dumpObject



142
143
144
145
146
147
148
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 142

def pretty_dump
  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.each_with_index.collect do |slot, index|
    slice, queue = slot.split('#')
    puts "Slice  ##{slice} -> #{queue}"
  end
end

#pretty_dump_queuesObject



176
177
178
179
180
181
182
183
184
185
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 176

def pretty_dump_queues
  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.each_with_index.collect do |slot, index|
    slice, queue = slot.split('#')
    puts "#Slice #{slice}"

    puts "#{Resque.peek(queue,0,0).to_s.gsub('},',"},\n")},"
    puts "\n"
  end
end

#request_refreshObject



252
253
254
255
256
257
258
259
260
261
262
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 252

def request_refresh
  if Resque.inline?
    # Instantiating a Resque::Job and calling perform on it so callbacks run
    # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job
    Resque::Job.new(:inline, {'class' => Resque::Plugins::Fifo::Queue::DrainWorker, 'args' => []}).perform
  else
    redis_client.set "fifo_update_timestamp-#{queue_prefix}", Time.now.to_s
    Resque.push(:fifo_refresh, :class => Resque::Plugins::Fifo::Queue::DrainWorker.to_s, :args => [])
  end

end

#update_workersObject



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 223

def update_workers
  # query removed workers
  start_time = Time.now.to_i
  redlock.lock("fifo_queue_lock-#{queue_prefix}", DLM_TTL) do |locked|
    if locked
      start_timestamp = redis_client.get "fifo_update_timestamp-#{queue_prefix}"

      process_dht
      cleanup_queues
      log("reinserting items from pending")
      reinsert_pending_items(pending_queue_name)

      # check if something tried to request an update, if so we requie again
      current_timestamp = redis_client.get "fifo_update_timestamp-#{queue_prefix}"

      if start_timestamp != current_timestamp
        request_refresh
      end
    else
      log("unable to lock DHT.")
    end
  end

  end_time = Time.now.to_i

  redis_client.set("fifo-stats-accumulated-recalc-time", end_time - start_time)
  redis_client.incr "fifo-stats-accumulated-recalc-count"
end

#worker_for_queue(queue_name) ⇒ Object



162
163
164
165
166
167
168
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 162

def worker_for_queue(queue_name)
  Resque.workers.collect do |worker|
    w_queue_name = worker.queues.select { |name| name.start_with?("#{queue_prefix}-") }.first
    return worker if w_queue_name == queue_name
  end.compact
  nil
end