Class: TimeoutQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/timeout_queue.rb,
lib/timeout_queue/version.rb

Constant Summary collapse

VERSION =
"0.1.1"

Instance Method Summary collapse

Constructor Details

#initialize(**opt) ⇒ TimeoutQueue

Returns a new instance of TimeoutQueue.



3
4
5
6
7
8
9
10
11
# File 'lib/timeout_queue.rb', line 3

def initialize(**opt)

  @queue = []
  @mutex = Mutex.new
  @received = ConditionVariable.new
  @closed = false
  @waiting = []
    
end

Instance Method Details

#clearObject



135
136
137
138
139
140
# File 'lib/timeout_queue.rb', line 135

def clear
  with_mutex do
    @queue.clear
  end
  self
end

#closeObject



117
118
119
120
121
122
123
124
125
# File 'lib/timeout_queue.rb', line 117

def close
  with_mutex do
    if not @closed
      @closed = true
      @waiting.each(&:wakeup)
    end
  end
  self
end

#closed?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/timeout_queue.rb', line 113

def closed?
  @closed
end

#delete(object) ⇒ Object

delete an object from the queue before it can be popped

Parameters:

  • object

Returns:

  • object



40
41
42
43
44
45
46
47
48
# File 'lib/timeout_queue.rb', line 40

def delete(object)

  with_mutex do        
    @queue.delete(object)                                
  end

  object

end

#empty?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'lib/timeout_queue.rb', line 127

def empty?
  @queue.send __method__
end

#num_waitingObject



107
108
109
110
111
# File 'lib/timeout_queue.rb', line 107

def num_waiting
  with_mutex do
    @waiting.size
  end
end

#pop(non_block = false, **opts) ⇒ Object Also known as: shift, deq

retrieve next object from queue

Parameters:

  • non_block (TrueClass, FalseClass) (defaults to: false)

    set true to enable non-blocking mode

  • opts (Hash)

Options Hash (**opts):

  • :timeout (Float, Integer)

    seconds to wait (in blocking mode)

Raises:

  • ThreadError if timeout expires or queue is empty in non_block mode



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/timeout_queue.rb', line 59

def pop(non_block=false, **opts)

  timeout = opts[:timeout]

  with_mutex do

    @waiting << Thread.current
  
    if timeout              
      end_time = Time.now + timeout.to_f    
    end
  
    while @queue.empty? and not(non_block) and not(@closed)
    
      if timeout
      
        break unless ((time_now = Time.now) < end_time)            
        
        @received.wait(@mutex, end_time - time_now)
          
      else
      
        @received.wait(@mutex)
          
      end
    
    end 
    
    @waiting.delete(Thread.current)
    
    if @queue.empty?
    
      if @closed
        raise ClosedQueueError          
      else
        raise ThreadError
      end
      
    else
                           
      @queue.shift
    
    end
          
  end

end

#push(object, **opt) ⇒ Object Also known as: <<, enq

push object into end of queue

Parameters:

  • object

Returns:

  • object



18
19
20
21
22
# File 'lib/timeout_queue.rb', line 18

def push(object, **opt)
  __push(object) do 
    @queue.send(__method__, object)
  end  
end

#sizeObject Also known as: length



131
132
133
# File 'lib/timeout_queue.rb', line 131

def size
  @queue.send __method__
end

#unshift(object) ⇒ Object

push object into front of the queue

Parameters:

  • object

Returns:

  • object



29
30
31
32
33
# File 'lib/timeout_queue.rb', line 29

def unshift(object)
  __push(object) do 
    @queue.send(__method__, object)
  end
end