Class: SuperQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/super_queue.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ SuperQueue

Returns a new instance of SuperQueue.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/super_queue.rb', line 17

def initialize(opts)
  check_opts(opts)
  opts[:localize_queue] = true unless opts.has_key? :localized_queue
  opts[:buffer_size] = 100 unless opts.has_key? :buffer_size

  @localize_queue = opts[:localize_queue]
  @queue_name = generate_queue_name(opts)
  initialize_sqs(opts)

  @waiting = []
  @waiting.taint
  self.taint
  @mutex = Mutex.new
  @in_buffer = SizedQueue.new(opts[:buffer_size])
  @out_buffer = SizedQueue.new(opts[:buffer_size])
  @deletion_queue = Queue.new
  @mock_length = 0 if SuperQueue.mocking?

  @sqs_head_tracker = Thread.new { poll_sqs_head }
  @sqs_tail_tracker = Thread.new { poll_sqs_tail }
  @garbage_collector = Thread.new { collect_garbage }
end

Class Method Details

.mock!Object



8
9
10
11
# File 'lib/super_queue.rb', line 8

def self.mock!
  @@mock = true
  Fog.mock!
end

.mocking?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/super_queue.rb', line 13

def self.mocking?
  defined?(@@mock) && @@mock
end

Instance Method Details

#clearObject



84
85
86
87
88
89
90
# File 'lib/super_queue.rb', line 84

def clear
  begin
    self.pop(true)
  rescue ThreadError
    retry unless self.empty?
  end until self.empty?
end

#destroyObject



104
105
106
107
108
109
# File 'lib/super_queue.rb', line 104

def destroy
  @sqs_head_tracker.terminate
  @sqs_tail_tracker.terminate
  @garbage_collector.terminate
  delete_queue
end

#empty?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/super_queue.rb', line 76

def empty?
  self.length == 0
end

#lengthObject Also known as: size



70
71
72
73
74
# File 'lib/super_queue.rb', line 70

def length
  @mutex.synchronize {
    return sqs_length + @in_buffer.size + @out_buffer.size
  }
end

#localized?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'lib/super_queue.rb', line 127

def localized?
  !!@localize_queue
end

#nameObject



123
124
125
# File 'lib/super_queue.rb', line 123

def name
  queue_name
end

#num_waitingObject



80
81
82
# File 'lib/super_queue.rb', line 80

def num_waiting
  @waiting.size
end

#pop(non_block = false) ⇒ Object Also known as: deq, shift



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/super_queue.rb', line 52

def pop(non_block=false)
  @mutex.synchronize {
    while true
      if @out_buffer.empty?
        if fill_out_buffer_from_sqs_queue || fill_out_buffer_from_in_buffer
          return pop_out_buffer(non_block)
        else
          raise ThreadError, "queue empty" if non_block
          @waiting.push Thread.current
          @mutex.sleep
        end
      else
        return pop_out_buffer(non_block)
      end
    end
  }
end

#push(p) ⇒ Object Also known as: enq, <<



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/super_queue.rb', line 40

def push(p)
  @mutex.synchronize {
    @in_buffer.push p
    begin
      t = @waiting.shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  }
end

#shutdownObject



92
93
94
95
96
97
98
99
100
101
102
# File 'lib/super_queue.rb', line 92

def shutdown
  @sqs_head_tracker.terminate
  while !@in_buffer.empty? do
    @sqs.delete_message(q_url, @in_buffer.pop)
  end  
  @sqs_tail_tracker.terminate
  @garbage_collector.terminate
  while !@deletion_queue.empty?
    @sqs.delete_message(q_url, @deletion_queue.pop)
  end
end

#urlObject



119
120
121
# File 'lib/super_queue.rb', line 119

def url
  q_url
end