Class: MultiThink::TimedStack

Inherits:
Object
  • Object
show all
Defined in:
lib/multithink/timed_stack.rb

Instance Method Summary collapse

Constructor Details

#initialize(size = 0, options) ⇒ TimedStack

Returns a new instance of TimedStack.



9
10
11
12
13
14
15
16
17
# File 'lib/multithink/timed_stack.rb', line 9

def initialize(size = 0, options)
  @size = size
  @options = options
  @que = []
  @checked_out = []
  @mutex = Mutex.new
  @resource = ConditionVariable.new
  @shutdown_block = nil
end

Instance Method Details

#available?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/multithink/timed_stack.rb', line 33

def available?
  @que.count + @checked_out.count < @size
end

#empty?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/multithink/timed_stack.rb', line 69

def empty?
  @que.empty?
end

#lengthObject



73
74
75
# File 'lib/multithink/timed_stack.rb', line 73

def length
  @que.length
end

#pop(timeout = 0.5) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/multithink/timed_stack.rb', line 37

def pop(timeout=0.5)
  deadline = Time.now + timeout
  @mutex.synchronize do
    loop do
      raise PoolShuttingDownError if @shutdown_block
      return @que.pop unless @que.empty?
      if available?
        new_conn = MultiThink::Connection.new(@options)
        @checked_out << new_conn
        return new_conn
      end
      to_wait = deadline - Time.now
      raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
      @resource.wait(@mutex, to_wait)
    end
  end
end

#push(obj) ⇒ Object Also known as: <<



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/multithink/timed_stack.rb', line 19

def push(obj)
  @mutex.synchronize do
    if @shutdown_block
      @shutdown_block.call(obj)
    else
      @que.push obj
      @checked_out.delete obj
    end

    @resource.broadcast
  end
end

#shutdown(&block) ⇒ Object

Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/multithink/timed_stack.rb', line 55

def shutdown(&block)
  raise ArgumentError, "shutdown must receive a block" unless block_given?

  @mutex.synchronize do
    @shutdown_block = block
    @resource.broadcast

    @que.size.times do
      conn = @que.pop
      block.call(conn)
    end
  end
end