Class: Async::Pool::Controller
- Inherits:
-
Object
- Object
- Async::Pool::Controller
- Defined in:
- lib/async/pool/controller.rb
Overview
A resource pool controller.
Instance Attribute Summary collapse
- #all allocated resources, and their associated usage.(allocatedresources) ⇒ Object readonly
-
#constructor ⇒ Object
readonly
Returns the value of attribute constructor.
-
#limit ⇒ Object
Returns the value of attribute limit.
-
#policy ⇒ Object
Returns the value of attribute policy.
-
#resources ⇒ Object
readonly
Returns the value of attribute resources.
-
#tags ⇒ Object
Returns the value of attribute tags.
- #The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object readonly
- #The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object readonly
- #The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object readonly
- #The name of the pool.(nameofthepool.) ⇒ Object readonly
Class Method Summary collapse
-
.wrap(**options, &block) ⇒ Object
Create a new resource pool, using the given block to create new resources.
Instance Method Summary collapse
-
#acquire ⇒ Object
Acquire a resource from the pool.
-
#active? ⇒ Boolean
Whether the pool has any active resources.
-
#as_json ⇒ Object
Generate a JSON representation of the pool.
-
#available? ⇒ Boolean
Whether there are available resources, i.e.
-
#busy? ⇒ Boolean
Whether there are resources which are currently in use.
-
#close ⇒ Object
Drain the pool, clear all resources, and stop the gardener.
- #concurrency ⇒ Object
-
#concurrency=(value) ⇒ Object
Set the maximum number of concurrent tasks that can be creating a new resource.
-
#drain ⇒ Object
Drain the pool, closing all resources.
-
#empty? ⇒ Boolean
Whether the pool is empty.
-
#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller
constructor
Create a new resource pool.
-
#prune(retain = 0) ⇒ Object
Retire (and close) all unused resources.
-
#release(resource) ⇒ Object
Make the resource resources and let waiting tasks know that there is something resources.
-
#retire(resource) ⇒ Object
Retire a specific resource.
-
#size ⇒ Object
The number of resources in the pool.
- #The pool policy.=(poolpolicy. = (value)) ⇒ Object
-
#to_json ⇒ Object
Generate a JSON representation of the pool.
-
#to_s ⇒ Object
Generate a human-readable representation of the pool.
-
#wait ⇒ Object
deprecated
Deprecated.
Use #wait_until_free instead.
-
#wait_until_free ⇒ Object
Wait until the pool is not busy (no resources in use).
Constructor Details
#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller
Create a new resource pool.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/async/pool/controller.rb', line 31 def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) @constructor = constructor @limit = limit # This semaphore is used to limit the number of concurrent tasks which are creating new resources. @guard = Async::Semaphore.new(concurrency) @policy = policy @gardener = nil = # All available resources: @resources = {} # Resources which may be available to be acquired: # This list may contain false positives, or resources which were okay but have since entered a state which is unusuable. @available = [] # Used to signal when a resource has been released: @mutex = Thread::Mutex.new @condition = Thread::ConditionVariable.new end |
Instance Attribute Details
#all allocated resources, and their associated usage.(allocatedresources) ⇒ Object (readonly)
99 |
# File 'lib/async/pool/controller.rb', line 99 attr :resources |
#constructor ⇒ Object (readonly)
Returns the value of attribute constructor.
56 57 58 |
# File 'lib/async/pool/controller.rb', line 56 def constructor @constructor end |
#limit ⇒ Object
Returns the value of attribute limit.
59 60 61 |
# File 'lib/async/pool/controller.rb', line 59 def limit @limit end |
#policy ⇒ Object
Returns the value of attribute policy.
96 97 98 |
# File 'lib/async/pool/controller.rb', line 96 def policy @policy end |
#resources ⇒ Object (readonly)
Returns the value of attribute resources.
99 100 101 |
# File 'lib/async/pool/controller.rb', line 99 def resources @resources end |
#tags ⇒ Object
Returns the value of attribute tags.
102 103 104 |
# File 'lib/async/pool/controller.rb', line 102 def end |
#The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object (readonly)
56 |
# File 'lib/async/pool/controller.rb', line 56 attr :constructor |
#The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object (readonly)
86 87 88 |
# File 'lib/async/pool/controller.rb', line 86 def concurrency @guard.limit end |
#The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object (readonly)
59 |
# File 'lib/async/pool/controller.rb', line 59 attr_accessor :limit |
#The name of the pool.(nameofthepool.) ⇒ Object (readonly)
102 |
# File 'lib/async/pool/controller.rb', line 102 attr_accessor :tags |
Class Method Details
.wrap(**options, &block) ⇒ Object
Create a new resource pool, using the given block to create new resources.
21 22 23 |
# File 'lib/async/pool/controller.rb', line 21 def self.wrap(**, &block) self.new(block, **) end |
Instance Method Details
#acquire ⇒ Object
Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.
154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/async/pool/controller.rb', line 154 def acquire resource = wait_for_resource return resource unless block_given? begin yield resource ensure release(resource) end end |
#active? ⇒ Boolean
Whether the pool has any active resources.
110 111 112 |
# File 'lib/async/pool/controller.rb', line 110 def active? !@resources.empty? end |
#as_json ⇒ Object
Generate a JSON representation of the pool.
71 72 73 74 75 76 77 78 |
# File 'lib/async/pool/controller.rb', line 71 def as_json(...) { limit: @limit, concurrency: @guard.limit, usage: @resources.size, availability_summary: self.availability_summary, } end |
#available? ⇒ Boolean
Whether there are available resources, i.e. whether #acquire can reuse an existing resource.
124 125 126 |
# File 'lib/async/pool/controller.rb', line 124 def available? @available.any? end |
#busy? ⇒ Boolean
Whether there are resources which are currently in use.
115 116 117 118 119 120 121 |
# File 'lib/async/pool/controller.rb', line 115 def busy? @resources.collect do |_, usage| return true if usage > 0 end return false end |
#close ⇒ Object
Drain the pool, clear all resources, and stop the gardener.
191 192 193 194 195 196 |
# File 'lib/async/pool/controller.rb', line 191 def close self.drain @available.clear @gardener&.stop end |
#concurrency ⇒ Object
86 87 88 |
# File 'lib/async/pool/controller.rb', line 86 def concurrency @guard.limit end |
#concurrency=(value) ⇒ Object
Set the maximum number of concurrent tasks that can be creating a new resource.
91 92 93 |
# File 'lib/async/pool/controller.rb', line 91 def concurrency= value @guard.limit = value end |
#drain ⇒ Object
Drain the pool, closing all resources.
181 182 183 184 185 186 187 188 |
# File 'lib/async/pool/controller.rb', line 181 def drain Console.debug(self, "Draining pool...", size: @resources.size) # Enumerate all existing resources and retire them: while resource = acquire_existing_resource retire(resource) end end |
#empty? ⇒ Boolean
Whether the pool is empty.
149 150 151 |
# File 'lib/async/pool/controller.rb', line 149 def empty? @resources.empty? end |
#prune(retain = 0) ⇒ Object
Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/async/pool/controller.rb', line 201 def prune(retain = 0) unused = [] # This code must not context switch: @resources.each do |resource, usage| if usage.zero? unused << resource end end # It's okay for this to context switch: unused.each do |resource| if block_given? yield resource else retire(resource) end break if @resources.size <= retain end # Update availability list: @available.clear @resources.each do |resource, usage| if usage < resource.concurrency and resource.reusable? @available << resource end end return unused.size end |
#release(resource) ⇒ Object
Make the resource resources and let waiting tasks know that there is something resources.
167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/async/pool/controller.rb', line 167 def release(resource) processed = false # A resource that is not good should also not be reusable. if resource.reusable? processed = reuse(resource) end # @policy.released(self, resource) ensure retire(resource) unless processed end |
#retire(resource) ⇒ Object
Retire a specific resource.
234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/async/pool/controller.rb', line 234 def retire(resource) Console.debug(self){"Retire #{resource}"} @resources.delete(resource) resource.close @mutex.synchronize{@condition.broadcast} return true end |
#size ⇒ Object
The number of resources in the pool.
105 106 107 |
# File 'lib/async/pool/controller.rb', line 105 def size @resources.size end |
#The pool policy.=(poolpolicy. = (value)) ⇒ Object
96 |
# File 'lib/async/pool/controller.rb', line 96 attr_accessor :policy |
#to_json ⇒ Object
Generate a JSON representation of the pool.
81 82 83 |
# File 'lib/async/pool/controller.rb', line 81 def to_json(...) as_json.to_json(...) end |
#to_s ⇒ Object
Generate a human-readable representation of the pool.
62 63 64 65 66 67 68 |
# File 'lib/async/pool/controller.rb', line 62 def to_s if @resources.empty? "\#<#{self.class}(#{usage_string})>" else "\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>" end end |
#wait ⇒ Object
Use #wait_until_free instead.
Wait until a pool resource has been freed.
130 131 132 133 134 |
# File 'lib/async/pool/controller.rb', line 130 def wait @mutex.synchronize do @condition.wait(@mutex) end end |
#wait_until_free ⇒ Object
Wait until the pool is not busy (no resources in use).
137 138 139 140 141 142 143 144 145 146 |
# File 'lib/async/pool/controller.rb', line 137 def wait_until_free @mutex.synchronize do if busy? yield self if block_given? # Wait until the pool is not busy: @condition.wait(@mutex) while busy? end end end |