Class: SuperQueue
- Inherits:
-
Object
- Object
- SuperQueue
- Defined in:
- lib/super_queue.rb
Defined Under Namespace
Classes: S3Pointer
Instance Method Summary collapse
- #clear ⇒ Object
- #destroy ⇒ Object
- #empty? ⇒ Boolean
-
#initialize(opts) ⇒ SuperQueue
constructor
A new instance of SuperQueue.
- #length ⇒ Object (also: #size)
- #name ⇒ Object
- #num_waiting ⇒ Object
- #pop(non_block = false) ⇒ Object (also: #deq, #shift)
- #push(p) ⇒ Object (also: #enq, #<<)
- #shutdown ⇒ Object
- #sqs_requests ⇒ Object
- #url ⇒ Object
Constructor Details
#initialize(opts) ⇒ SuperQueue
Returns a new instance of SuperQueue.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/super_queue.rb', line 22 def initialize(opts) check_opts(opts) @buffer_size = opts[:buffer_size] || 100 @queue_name = generate_queue_name(opts) @bucket_name = opts[:bucket_name] || queue_name @write_count = 0 @read_count = 0 @delete_count = 0 initialize_aws(opts) @waiting = [] @waiting.taint self.taint @mutex = Mutex.new @in_buffer = [] @out_buffer = [] @deletion_buffer = [] @deletion_queue = [] @gc = Thread.new do begin collect_garbage rescue Exception => @gc_error raise @gc_error end end fill_out_buffer_from_sqs_queue end |
Instance Method Details
#clear ⇒ Object
99 100 101 102 103 104 105 |
# File 'lib/super_queue.rb', line 99 def clear begin self.pop(true) rescue ThreadError retry unless self.empty? end until self.empty? end |
#destroy ⇒ Object
115 116 117 118 119 |
# File 'lib/super_queue.rb', line 115 def destroy @gc.terminate delete_aws_resources @done = true end |
#empty? ⇒ Boolean
93 94 95 96 97 |
# File 'lib/super_queue.rb', line 93 def empty? len = 0 2.times { len += self.length; sleep(0.01) } len == 0 end |
#length ⇒ Object Also known as: size
81 82 83 84 85 86 |
# File 'lib/super_queue.rb', line 81 def length check_for_errors @mutex.synchronize { return sqs_length + @in_buffer.size + @out_buffer.size } end |
#name ⇒ Object
138 139 140 |
# File 'lib/super_queue.rb', line 138 def name queue_name end |
#num_waiting ⇒ Object
88 89 90 91 |
# File 'lib/super_queue.rb', line 88 def num_waiting check_for_errors @waiting.size end |
#pop(non_block = false) ⇒ Object Also known as: deq, shift
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/super_queue.rb', line 66 def pop(non_block=false) check_for_errors @mutex.synchronize { loop do if @out_buffer.empty? && !(fill_out_buffer_from_sqs_queue || fill_out_buffer_from_in_buffer) raise ThreadError, "queue empty" if non_block @waiting.push Thread.current @mutex.sleep else return pop_out_buffer end end } end |
#push(p) ⇒ Object Also known as: enq, <<
52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/super_queue.rb', line 52 def push(p) check_for_errors @mutex.synchronize { @in_buffer.push p clear_in_buffer if @in_buffer.size >= @buffer_size begin t = @waiting.shift t.wakeup if t rescue ThreadError retry end } end |
#shutdown ⇒ Object
107 108 109 110 111 112 113 |
# File 'lib/super_queue.rb', line 107 def shutdown @mutex.synchronize { clear_in_buffer } @gc.terminate @mutex.synchronize { fill_deletion_queue_from_buffer } if @deletion_buffer.any? @mutex.synchronize { clear_deletion_queue } if @deletion_queue.any? @done = true end |
#sqs_requests ⇒ Object
121 122 123 124 |
# File 'lib/super_queue.rb', line 121 def sqs_requests check_for_errors @write_count + @read_count + @delete_count end |
#url ⇒ Object
134 135 136 |
# File 'lib/super_queue.rb', line 134 def url q_url end |