Class: Async::Pool::Controller

Inherits:
Object
  • Object
show all
Defined in:
lib/async/pool/controller.rb

Overview

A resource pool controller.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(constructor, limit: nil, concurrency: 1, policy: nil, tags: nil) ⇒ Controller

Create a new resource pool.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/async/pool/controller.rb', line 32

def initialize(constructor, limit: nil, concurrency: 1, policy: nil, tags: nil)
  @constructor = constructor
  @limit = limit
  
  # This semaphore is used to limit the number of concurrent tasks which are creating new resources.
  @guard = Async::Semaphore.new(concurrency)
  
  @policy = policy
  @gardener = nil
  
  @tags = tags
  
  # All available resources:
  @resources = {}
  
  # Resources which may be available to be acquired:
  # This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
  @available = []
  
  # Used to signal when a resource has been released:
  @mutex = Thread::Mutex.new
  @condition = Thread::ConditionVariable.new
end

Instance Attribute Details

#all allocated resources, and their associated usage.(allocatedresources) ⇒ Object (readonly)



100
# File 'lib/async/pool/controller.rb', line 100

attr :resources

#constructorObject (readonly)

Returns the value of attribute constructor.



57
58
59
# File 'lib/async/pool/controller.rb', line 57

def constructor
  @constructor
end

#limitObject

Returns the value of attribute limit.



60
61
62
# File 'lib/async/pool/controller.rb', line 60

def limit
  @limit
end

#policyObject

Returns the value of attribute policy.



97
98
99
# File 'lib/async/pool/controller.rb', line 97

def policy
  @policy
end

#resourcesObject (readonly)

Returns the value of attribute resources.



100
101
102
# File 'lib/async/pool/controller.rb', line 100

def resources
  @resources
end

#tagsObject

Returns the value of attribute tags.



103
104
105
# File 'lib/async/pool/controller.rb', line 103

def tags
  @tags
end

#The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object (readonly)



57
# File 'lib/async/pool/controller.rb', line 57

attr :constructor

#The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object (readonly)



87
88
89
# File 'lib/async/pool/controller.rb', line 87

def concurrency
  @guard.limit
end

#The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object (readonly)



60
# File 'lib/async/pool/controller.rb', line 60

attr_accessor :limit

#The name of the pool.(nameofthepool.) ⇒ Object (readonly)



103
# File 'lib/async/pool/controller.rb', line 103

attr_accessor :tags

Class Method Details

.wrap(**options, &block) ⇒ Object

Create a new resource pool, using the given block to create new resources.



22
23
24
# File 'lib/async/pool/controller.rb', line 22

def self.wrap(**options, &block)
  self.new(block, **options)
end

Instance Method Details

#acquireObject

Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/async/pool/controller.rb', line 155

def acquire
  resource = wait_for_resource
  
  return resource unless block_given?
  
  begin
    yield resource
  ensure
    release(resource)
  end
end

#active?Boolean

Whether the pool has any active resources.

Returns:

  • (Boolean)


111
112
113
# File 'lib/async/pool/controller.rb', line 111

def active?
  !@resources.empty?
end

#as_jsonObject

Generate a JSON representation of the pool.



72
73
74
75
76
77
78
79
# File 'lib/async/pool/controller.rb', line 72

def as_json(...)
  {
    limit: @limit,
    concurrency: @guard.limit,
    usage: @resources.size,
    availability_summary: self.availability_summary,
  }
end

#available?Boolean

Whether there are available resources, i.e. whether #acquire can reuse an existing resource.

Returns:

  • (Boolean)


125
126
127
# File 'lib/async/pool/controller.rb', line 125

def available?
  @available.any?
end

#busy?Boolean

Whether there are resources which are currently in use.

Returns:

  • (Boolean)


116
117
118
119
120
121
122
# File 'lib/async/pool/controller.rb', line 116

def busy?
  @resources.collect do |_, usage|
    return true if usage > 0
  end
  
  return false
end

#closeObject

Drain the pool, clear all resources, and stop the gardener.



192
193
194
195
196
197
# File 'lib/async/pool/controller.rb', line 192

def close
  self.drain
  
  @available.clear
  @gardener&.stop
end

#concurrencyObject



87
88
89
# File 'lib/async/pool/controller.rb', line 87

def concurrency
  @guard.limit
end

#concurrency=(value) ⇒ Object

Set the maximum number of concurrent tasks that can be creating a new resource.



92
93
94
# File 'lib/async/pool/controller.rb', line 92

def concurrency= value
  @guard.limit = value
end

#drainObject

Drain the pool, closing all resources.



182
183
184
185
186
187
188
189
# File 'lib/async/pool/controller.rb', line 182

def drain
  Console.debug(self, "Draining pool...", size: @resources.size)
  
  # Enumerate all existing resources and retire them:
  while resource = acquire_existing_resource
    retire(resource)
  end
end

#empty?Boolean

Whether the pool is empty.

Returns:

  • (Boolean)


150
151
152
# File 'lib/async/pool/controller.rb', line 150

def empty?
  @resources.empty?
end

#prune(retain = 0) ⇒ Object

Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/async/pool/controller.rb', line 202

def prune(retain = 0)
  unused = []
  
  # This code must not context switch:
  @resources.each do |resource, usage|
    if usage.zero?
      unused << resource
    end
  end
  
  # It's okay for this to context switch:
  unused.each do |resource|
    if block_given?
      yield resource
    else
      retire(resource)
    end
    
    break if @resources.size <= retain
  end
  
  # Update availability list:
  @available.clear
  @resources.each do |resource, usage|
    if usage < resource.concurrency and resource.reusable?
      @available << resource
    end
  end
  
  return unused.size
end

#release(resource) ⇒ Object

Make the resource resources and let waiting tasks know that there is something resources.



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/async/pool/controller.rb', line 168

def release(resource)
  processed = false
  
  # A resource that is not good should also not be reusable.
  if resource.reusable?
    processed = reuse(resource)
  end
  
  # @policy.released(self, resource)
ensure
  retire(resource) unless processed
end

#retire(resource) ⇒ Object

Retire a specific resource.



235
236
237
238
239
240
241
242
243
244
245
# File 'lib/async/pool/controller.rb', line 235

def retire(resource)
  Console.debug(self){"Retire #{resource}"}
  
  return false unless @resources.delete(resource)
  
  resource.close
  
  @mutex.synchronize{@condition.broadcast}
  
  return true
end

#sizeObject

The number of resources in the pool.



106
107
108
# File 'lib/async/pool/controller.rb', line 106

def size
  @resources.size
end

#The pool policy.=(poolpolicy. = (value)) ⇒ Object



97
# File 'lib/async/pool/controller.rb', line 97

attr_accessor :policy

#to_jsonObject

Generate a JSON representation of the pool.



82
83
84
# File 'lib/async/pool/controller.rb', line 82

def to_json(...)
  as_json.to_json(...)
end

#to_sObject

Generate a human-readable representation of the pool.



63
64
65
66
67
68
69
# File 'lib/async/pool/controller.rb', line 63

def to_s
  if @resources.empty?
    "\#<#{self.class}(#{usage_string})>"
  else
    "\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
  end
end

#waitObject

Deprecated.

Use #wait_until_free instead.

Wait until a pool resource has been freed.



131
132
133
134
135
# File 'lib/async/pool/controller.rb', line 131

def wait
  @mutex.synchronize do
    @condition.wait(@mutex)
  end
end

#wait_until_freeObject

Wait until the pool is not busy (no resources in use).



138
139
140
141
142
143
144
145
146
147
# File 'lib/async/pool/controller.rb', line 138

def wait_until_free
  @mutex.synchronize do
    if busy?
      yield self if block_given?
      
      # Wait until the pool is not busy:
      @condition.wait(@mutex) while busy?
    end
  end
end