Module: BackgroundQueue::ServerLib::QueueRegistry

Included in:
BalancedQueue, Owner
Defined in:
lib/background_queue/server_lib/queue_registry.rb

Instance Method Summary collapse

Instance Method Details

#add_item(item) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
# File 'lib/background_queue/server_lib/queue_registry.rb', line 4

def add_item(item)
  in_queue, queue = get_queue(get_queue_id_from_item(item), true)
  priority_increased, original_priority = track_priority_when_adding_to_queue(queue, item)
  if !queue.stalled? && (!in_queue || priority_increased)
    if in_queue #remove from existing priority queue
      remove(queue, original_priority)
    end
    push(queue)
  elsif queue.stalled? && !(queue.synchronous? && queue.has_running_items?) #it stalled because it was empty...
    resume_queue(queue)
  end
end

#finish_item(item) ⇒ Object



52
53
54
55
56
57
58
59
# File 'lib/background_queue/server_lib/queue_registry.rb', line 52

def finish_item(item)
  #puts "#{self.class.name}:finish item: #{item.inspect}"
  in_queue, queue = get_queue(get_queue_id_from_item(item), false)
  raise "Queue #{get_queue_id_from_item(item)} unavailble when finishing item" if queue.nil?
  queue.finish_item(item)
  @running_items -= 1
  resume_queue(queue) unless queue.synchronous? && queue.has_running_items?
end

#next_itemObject



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/background_queue/server_lib/queue_registry.rb', line 17

def next_item
  item = nil
  queue = pop
  unless queue.nil?
    priority_decreased, original_priority, item = remove_item_from_queue(queue, :next)
    
    # some items must run synchronously, so we dont want to add it back until the task is finished.
    # if the queue it empty we still want to keep it there until the task is finished, incase the running task queues more tasks against the job.
    if queue.empty? || queue.synchronous?
      @items.delete(queue.id) 
      stall_queue(queue)
    else
      push(queue)
    end
    @running_items += 1
    item.running = true unless item.nil?
  end
  #server.logger.debug("next item #{item.nil? ? 'nil' : item.id}")
  item
end

#remove_item(item) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/background_queue/server_lib/queue_registry.rb', line 38

def remove_item(item)
  item.running = false
  in_queue, queue = get_queue(get_queue_id_from_item(item), false)
  raise "Unable to remove task #{item.id} at priority #{item.priority} (no queue at that priority)" if queue.nil?
  priority_decreased, original_priority, item = remove_item_from_queue(queue, item)
  if queue.empty?
    remove(queue, original_priority)
    @items.delete(queue.id)
  elsif priority_decreased
    remove(queue, original_priority)
    push(queue)
  end
end