Class: Innertube::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/innertube.rb

Overview

A re-entrant thread-safe resource pool that generates new resources on demand.

Defined Under Namespace

Classes: BadResource, Element

Instance Method Summary collapse

Constructor Details

#initialize(open, close) ⇒ Pool

Creates a new resource pool.


56
57
58
59
60
61
62
63
# File 'lib/innertube.rb', line 56

def initialize(open, close)
  @open = open
  @close = close
  @lock = Mutex.new
  @iterator = Mutex.new
  @element_released = ConditionVariable.new
  @pool = Set.new
end

Instance Method Details

#clearObject Also known as: close

On each element of the pool, calls close(element) and removes it.


77
78
79
80
81
# File 'lib/innertube.rb', line 77

def clear
  each_element do |e|
    delete_element e
  end
end

#delete_if {|object| ... } ⇒ Object

Locks each element in turn and closes/deletes elements for which the object passes the block.

Yields:

  • (object)

    a block that should determine whether an element should be deleted from the pool

Yield Parameters:

  • object (Object)

    the resource

Raises:

  • (ArgumentError)

100
101
102
103
104
105
106
107
108
# File 'lib/innertube.rb', line 100

def delete_if
  raise ArgumentError, "block required" unless block_given?

  each_element do |e|
    if yield e.object
      delete_element e
    end
  end
end

#each {|resource| ... } ⇒ Object

As each_element, but yields objects, not wrapper elements.

Yields:

  • (resource)

    a block that will do something with each resource in the pool

Yield Parameters:

  • resource (Object)

    the current resource in the iteration


188
189
190
191
192
# File 'lib/innertube.rb', line 188

def each
  each_element do |e|
    yield e.object
  end
end

#each_element {|element| ... } ⇒ Object

Iterate over a snapshot of the pool. Yielded objects are locked for the duration of the block. This may block the current thread until elements in the snapshot are released by other threads.

Yields:

  • (element)

    a block that will do something with each element in the pool

Yield Parameters:

  • element (Element)

    the current element in the iteration


160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/innertube.rb', line 160

def each_element
  targets = @pool.to_a
  unlocked = []

  @iterator.synchronize do
    until targets.empty?
      @lock.synchronize do
        @element_released.wait(@iterator) if targets.all? {|e| e.locked? }
        unlocked, targets = targets.partition {|e| e.unlocked? }
        unlocked.each {|e| e.lock }
      end

      unlocked.each do |e|
        begin
          yield e
        ensure
          e.unlock
        end
      end
    end
  end
end

#fill(resources) ⇒ Object

Populate the pool with existing, open resources.


67
68
69
70
71
72
73
# File 'lib/innertube.rb', line 67

def fill(resources)
  @lock.synchronize do
    resources.each do |r|
      @pool << Element.new(r)
    end
  end
end

#sizeInteger


195
196
197
# File 'lib/innertube.rb', line 195

def size
  @lock.synchronize { @pool.size }
end

#take(opts = {}) {|resource| ... } ⇒ Object Also known as: >>

Acquire an element of the pool. Yields the object. If all elements are claimed, it will create another one.

Yields:

  • (resource)

    a block that will perform some action with the element of the pool

Yield Parameters:

  • resource (Object)

    a resource managed by the pool. Locked for the duration of the block

Raises:

  • (ArgumentError)

121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/innertube.rb', line 121

def take(opts = {})
  raise ArgumentError, "block required" unless block_given?

  result = nil
  element = nil
  opts[:filter] ||= proc {|_| true }
  @lock.synchronize do
    element = @pool.find { |e| e.unlocked? && opts[:filter].call(e.object) }
    unless element      # No objects were acceptable

      resource = opts[:default] || @open.call
      element = Element.new(resource)
      @pool << element
    end
    element.lock
  end
  begin
    result = yield element.object
  rescue BadResource
    delete_element element
    raise
  ensure
    # Unlock
    if element
      element.unlock
      @element_released.signal
    end
  end
  result
end