Class: MultiOpQueue::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/multi_op_queue.rb

Overview

Original Queue implementation from Ruby-2.0.0 github.com/ruby/ruby/blob/ruby_2_0_0/lib/thread.rb

This class provides a way to synchronize communication between threads.

Example:

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

Instance Method Summary collapse

Constructor Details

#initializeQueue

Creates a new queue.



37
38
39
40
41
42
43
44
# File 'lib/multi_op_queue.rb', line 37

def initialize
  @que = []
  @que.taint          # enable tainted communication
  @num_waiting = 0
  self.taint
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#clearObject

Removes all objects from the queue.



167
168
169
# File 'lib/multi_op_queue.rb', line 167

def clear
  @que.clear
end

#concat(ary) ⇒ Object

Concatenates ary onto the queue.



49
50
51
52
53
54
55
56
# File 'lib/multi_op_queue.rb', line 49

def concat(ary)
  handle_interrupt do
    @mutex.synchronize do
      @que.concat ary
      @cond.signal
    end
  end
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


160
161
162
# File 'lib/multi_op_queue.rb', line 160

def empty?
  @que.empty?
end

#lengthObject Also known as: size

Returns the length of the queue.



174
175
176
# File 'lib/multi_op_queue.rb', line 174

def length
  @que.length
end

#num_waitingObject

Returns the number of threads waiting on the queue.



186
187
188
# File 'lib/multi_op_queue.rb', line 186

def num_waiting
  @num_waiting
end

#pop(non_block = false) ⇒ Object Also known as: shift, deq

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/multi_op_queue.rb', line 85

def pop(non_block=false)
  handle_interrupt do
    @mutex.synchronize do
      while true
        if @que.empty?
          if non_block
            raise ThreadError, "queue empty"
          else
            begin
              @num_waiting += 1
              @cond.wait @mutex
            ensure
              @num_waiting -= 1
            end
          end
        else
          return @que.shift
        end
      end
    end
  end
end

#pop_up_to(num_to_pop = 1, opts = {}) ⇒ Object

Retrieves data from the queue and returns array of contents. If num_to_pop are available in the queue then multiple elements are returned in array response If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/multi_op_queue.rb', line 125

def pop_up_to(num_to_pop = 1, opts = {})
  case opts
  when TrueClass, FalseClass
    non_bock = opts
  when Hash
    timeout = opts.fetch(:timeout, nil)
    non_block = opts.fetch(:non_block, false)
  end

  handle_interrupt do
    @mutex.synchronize do
      while true
        if @que.empty?
          if non_block
            raise ThreadError, "queue empty"
          else
            begin
              @num_waiting += 1
              @cond.wait(@mutex, timeout)
              return nil if @que.empty?
            ensure
              @num_waiting -= 1
            end
          end
        else
          return @que.shift(num_to_pop)
        end
      end
    end
  end
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue.



61
62
63
64
65
66
67
68
# File 'lib/multi_op_queue.rb', line 61

def push(obj)
  handle_interrupt do
    @mutex.synchronize do
      @que.push obj
      @cond.signal
    end
  end
end