Class: SuperQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/super_queue.rb

Defined Under Namespace

Classes: S3Pointer

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ SuperQueue

Returns a new instance of SuperQueue.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/super_queue.rb', line 22

def initialize(opts)
  check_opts(opts)
  @buffer_size = opts[:buffer_size] || 100
  @queue_name = generate_queue_name(opts)
  @bucket_name = opts[:bucket_name] || queue_name
  @write_count = 0
  @read_count = 0
  @delete_count = 0
  initialize_aws(opts)

  @waiting = []
  @waiting.taint
  self.taint
  @mutex = Mutex.new
  @in_buffer = []
  @out_buffer = []
  @deletion_buffer = []
  @deletion_queue = []

  @gc = Thread.new do 
    begin
      collect_garbage
    rescue Exception => @gc_error
      raise @gc_error
    end
  end

  fill_out_buffer_from_sqs_queue
end

Instance Method Details

#clearObject



99
100
101
102
103
104
105
# File 'lib/super_queue.rb', line 99

def clear
  begin
    self.pop(true)
  rescue ThreadError
    retry unless self.empty?
  end until self.empty?
end

#destroyObject



115
116
117
118
119
# File 'lib/super_queue.rb', line 115

def destroy
  @gc.terminate
  delete_aws_resources
  @done = true
end

#empty?Boolean

Returns:

  • (Boolean)


93
94
95
96
97
# File 'lib/super_queue.rb', line 93

def empty?
  len = 0
  2.times { len += self.length; sleep(0.01) }
  len == 0
end

#lengthObject Also known as: size



81
82
83
84
85
86
# File 'lib/super_queue.rb', line 81

def length
  check_for_errors
  @mutex.synchronize {
    return sqs_length + @in_buffer.size + @out_buffer.size
  }
end

#nameObject



138
139
140
# File 'lib/super_queue.rb', line 138

def name
  queue_name
end

#num_waitingObject



88
89
90
91
# File 'lib/super_queue.rb', line 88

def num_waiting
  check_for_errors
  @waiting.size
end

#pop(non_block = false) ⇒ Object Also known as: deq, shift



66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/super_queue.rb', line 66

def pop(non_block=false)
  check_for_errors
  @mutex.synchronize {
    loop do
      if @out_buffer.empty? && !(fill_out_buffer_from_sqs_queue || fill_out_buffer_from_in_buffer)
        raise ThreadError, "queue empty" if non_block
        @waiting.push Thread.current
        @mutex.sleep
      else
        return pop_out_buffer
      end
    end
  }
end

#push(p) ⇒ Object Also known as: enq, <<



52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/super_queue.rb', line 52

def push(p)
  check_for_errors
  @mutex.synchronize {
    @in_buffer.push p
    clear_in_buffer if @in_buffer.size >= @buffer_size
    begin
      t = @waiting.shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  }
end

#shutdownObject



107
108
109
110
111
112
113
# File 'lib/super_queue.rb', line 107

def shutdown
  @mutex.synchronize { clear_in_buffer }
  @gc.terminate
  @mutex.synchronize { fill_deletion_queue_from_buffer } if @deletion_buffer.any?
  @mutex.synchronize { clear_deletion_queue } if @deletion_queue.any?
  @done = true
end

#sqs_requestsObject



121
122
123
124
# File 'lib/super_queue.rb', line 121

def sqs_requests
  check_for_errors
  @write_count + @read_count + @delete_count
end

#urlObject



134
135
136
# File 'lib/super_queue.rb', line 134

def url
  q_url
end