Class: Innertube::Pool
- Inherits:
-
Object
- Object
- Innertube::Pool
- Defined in:
- lib/innertube.rb
Overview
A re-entrant thread-safe resource pool that generates new resources on demand.
Defined Under Namespace
Classes: BadResource, Element
Instance Method Summary collapse
-
#clear ⇒ Object
(also: #close)
On each element of the pool, calls close(element) and removes it.
-
#delete_if {|object| ... } ⇒ Object
Locks each element in turn and closes/deletes elements for which the object passes the block.
-
#each {|resource| ... } ⇒ Object
As each_element, but yields objects, not wrapper elements.
-
#each_element {|element| ... } ⇒ Object
Iterate over a snapshot of the pool.
-
#fill(resources) ⇒ Object
Populate the pool with existing, open resources.
-
#initialize(open, close) ⇒ Pool
constructor
Creates a new resource pool.
-
#size ⇒ Integer
The number of the resources in the pool.
-
#take(opts = {}) {|resource| ... } ⇒ Object
(also: #>>)
Acquire an element of the pool.
Constructor Details
#initialize(open, close) ⇒ Pool
Creates a new resource pool.
56 57 58 59 60 61 62 63 |
# File 'lib/innertube.rb', line 56 def initialize(open, close) @open = open @close = close @lock = Mutex.new @iterator = Mutex.new @element_released = ConditionVariable.new @pool = Set.new end |
Instance Method Details
#clear ⇒ Object Also known as: close
On each element of the pool, calls close(element) and removes it.
77 78 79 80 81 |
# File 'lib/innertube.rb', line 77 def clear each_element do |e| delete_element e end end |
#delete_if {|object| ... } ⇒ Object
Locks each element in turn and closes/deletes elements for which the object passes the block.
100 101 102 103 104 105 106 107 108 |
# File 'lib/innertube.rb', line 100 def delete_if raise ArgumentError, "block required" unless block_given? each_element do |e| if yield e.object delete_element e end end end |
#each {|resource| ... } ⇒ Object
As each_element, but yields objects, not wrapper elements.
188 189 190 191 192 |
# File 'lib/innertube.rb', line 188 def each each_element do |e| yield e.object end end |
#each_element {|element| ... } ⇒ Object
Iterate over a snapshot of the pool. Yielded objects are locked for the duration of the block. This may block the current thread until elements in the snapshot are released by other threads.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/innertube.rb', line 160 def each_element targets = @pool.to_a unlocked = [] @iterator.synchronize do until targets.empty? @lock.synchronize do @element_released.wait(@iterator) if targets.all? {|e| e.locked? } unlocked, targets = targets.partition {|e| e.unlocked? } unlocked.each {|e| e.lock } end unlocked.each do |e| begin yield e ensure e.unlock end end end end end |
#fill(resources) ⇒ Object
Populate the pool with existing, open resources.
67 68 69 70 71 72 73 |
# File 'lib/innertube.rb', line 67 def fill(resources) @lock.synchronize do resources.each do |r| @pool << Element.new(r) end end end |
#size ⇒ Integer
Returns the number of the resources in the pool.
195 196 197 |
# File 'lib/innertube.rb', line 195 def size @lock.synchronize { @pool.size } end |
#take(opts = {}) {|resource| ... } ⇒ Object Also known as: >>
Acquire an element of the pool. Yields the object. If all elements are claimed, it will create another one.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/innertube.rb', line 121 def take(opts = {}) raise ArgumentError, "block required" unless block_given? result = nil element = nil opts[:filter] ||= proc {|_| true } @lock.synchronize do element = @pool.find { |e| e.unlocked? && opts[:filter].call(e.object) } unless element # No objects were acceptable resource = opts[:default] || @open.call element = Element.new(resource) @pool << element end element.lock end begin result = yield element.object rescue BadResource delete_element element raise ensure # Unlock if element element.unlock @element_released.signal end end result end |