Class: DelayQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/delay_queue.rb,
lib/delay_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(clock = Time) ⇒ DelayQueue

Returns a new instance of DelayQueue.



10
11
12
13
14
# File 'lib/delay_queue.rb', line 10

def initialize(clock=Time)
  @clock = clock
  @timestamp_to_elements = TreeMap.new
  @element_to_timestamp = {}
end

Instance Method Details

#include?(element) ⇒ Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/delay_queue.rb', line 78

def include?(element)
  @element_to_timestamp.key?(element)
end

#pop(n = 1) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/delay_queue.rb', line 37

def pop(n=1)
  popped_elements = []
  loop do
    entry = @timestamp_to_elements.first_entry
    break unless entry
    break if entry.key > @clock.now.to_i
    elements = entry.value
    iterator = elements.iterator
    while iterator.has_next && popped_elements.size < n
      element = iterator.next
      @element_to_timestamp.delete(element)
      iterator.remove
      popped_elements << element
    end
    break unless elements.empty?
    @timestamp_to_elements.delete(entry.key)
  end
  if n == 1
    popped_elements.first
  else
    popped_elements
  end
end

#pop_allObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/delay_queue.rb', line 61

def pop_all
  popped_elements = []
  cutoff = @timestamp_to_elements.floor_key(@clock.now.to_i)
  if cutoff
    loop do
      entry = @timestamp_to_elements.poll_first_entry
      elements = entry.value
      elements.each do |element|
        @element_to_timestamp.delete(element)
        popped_elements << element
      end
      break if entry.key == cutoff 
    end
  end
  popped_elements
end

#put(element, timestamp = Time.now.to_i, options = {}) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/delay_queue.rb', line 16

def put(element, timestamp=Time.now.to_i, options={})
  existing_timestamp = @element_to_timestamp[element]
  if !existing_timestamp || existing_timestamp < timestamp || options[:force]
    remove(element) if existing_timestamp
    elements = @timestamp_to_elements.get(timestamp)
    elements ||= HashSet.new
    elements.add(element)
    @timestamp_to_elements.put(timestamp, elements)
    @element_to_timestamp[element] = timestamp
  end
end

#remove(element) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/delay_queue.rb', line 28

def remove(element)
  timestamp = @element_to_timestamp.delete(element)
  if timestamp
    elements = @timestamp_to_elements.get(timestamp)
    elements.remove(element)
    @timestamp_to_elements.remove(timestamp) if elements.empty?
  end
end

#sizeObject



86
87
88
# File 'lib/delay_queue.rb', line 86

def size
  @element_to_timestamp.size
end

#to_hObject



82
83
84
# File 'lib/delay_queue.rb', line 82

def to_h
  @element_to_timestamp.dup
end