Class: Async::Pool::Controller

Inherits:
Object
  • Object
show all
Defined in:
lib/async/pool/controller.rb

Overview

A resource pool controller.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller

Create a new resource pool.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/async/pool/controller.rb', line 31

def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
  @constructor = constructor
  @limit = limit
  
  # This semaphore is used to limit the number of concurrent tasks which are creating new resources.
  @guard = Async::Semaphore.new(concurrency)
  
  @policy = policy
  @gardener = nil
  
  @tags = tags
  
  # All available resources:
  @resources = {}
  
  # Resources which may be available to be acquired:
  # This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
  @available = []
  
  # Used to signal when a resource has been released:
  @mutex = Thread::Mutex.new
  @condition = Thread::ConditionVariable.new
end

Instance Attribute Details

#all allocated resources, and their associated usage.(allocatedresources) ⇒ Object (readonly)



99
# File 'lib/async/pool/controller.rb', line 99

attr :resources

#constructorObject (readonly)

Returns the value of attribute constructor.



56
57
58
# File 'lib/async/pool/controller.rb', line 56

def constructor
  @constructor
end

#limitObject

Returns the value of attribute limit.



59
60
61
# File 'lib/async/pool/controller.rb', line 59

def limit
  @limit
end

#policyObject

Returns the value of attribute policy.



96
97
98
# File 'lib/async/pool/controller.rb', line 96

def policy
  @policy
end

#resourcesObject (readonly)

Returns the value of attribute resources.



99
100
101
# File 'lib/async/pool/controller.rb', line 99

def resources
  @resources
end

#tagsObject

Returns the value of attribute tags.



102
103
104
# File 'lib/async/pool/controller.rb', line 102

def tags
  @tags
end

#The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object (readonly)



56
# File 'lib/async/pool/controller.rb', line 56

attr :constructor

#The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object (readonly)



86
87
88
# File 'lib/async/pool/controller.rb', line 86

def concurrency
  @guard.limit
end

#The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object (readonly)



59
# File 'lib/async/pool/controller.rb', line 59

attr_accessor :limit

#The name of the pool.(nameofthepool.) ⇒ Object (readonly)



102
# File 'lib/async/pool/controller.rb', line 102

attr_accessor :tags

Class Method Details

.wrap(**options, &block) ⇒ Object

Create a new resource pool, using the given block to create new resources.



21
22
23
# File 'lib/async/pool/controller.rb', line 21

def self.wrap(**options, &block)
  self.new(block, **options)
end

Instance Method Details

#acquireObject

Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.



154
155
156
157
158
159
160
161
162
163
164
# File 'lib/async/pool/controller.rb', line 154

def acquire
  resource = wait_for_resource
  
  return resource unless block_given?
  
  begin
    yield resource
  ensure
    release(resource)
  end
end

#active?Boolean

Whether the pool has any active resources.

Returns:

  • (Boolean)


110
111
112
# File 'lib/async/pool/controller.rb', line 110

def active?
  !@resources.empty?
end

#as_jsonObject

Generate a JSON representation of the pool.



71
72
73
74
75
76
77
78
# File 'lib/async/pool/controller.rb', line 71

def as_json(...)
  {
    limit: @limit,
    concurrency: @guard.limit,
    usage: @resources.size,
    availability_summary: self.availability_summary,
  }
end

#available?Boolean

Whether there are available resources, i.e. whether #acquire can reuse an existing resource.

Returns:

  • (Boolean)


124
125
126
# File 'lib/async/pool/controller.rb', line 124

def available?
  @available.any?
end

#busy?Boolean

Whether there are resources which are currently in use.

Returns:

  • (Boolean)


115
116
117
118
119
120
121
# File 'lib/async/pool/controller.rb', line 115

def busy?
  @resources.collect do |_, usage|
    return true if usage > 0
  end
  
  return false
end

#closeObject

Drain the pool, clear all resources, and stop the gardener.



191
192
193
194
195
196
# File 'lib/async/pool/controller.rb', line 191

def close
  self.drain
  
  @available.clear
  @gardener&.stop
end

#concurrencyObject



86
87
88
# File 'lib/async/pool/controller.rb', line 86

def concurrency
  @guard.limit
end

#concurrency=(value) ⇒ Object

Set the maximum number of concurrent tasks that can be creating a new resource.



91
92
93
# File 'lib/async/pool/controller.rb', line 91

def concurrency= value
  @guard.limit = value
end

#drainObject

Drain the pool, closing all resources.



181
182
183
184
185
186
187
188
# File 'lib/async/pool/controller.rb', line 181

def drain
  Console.debug(self, "Draining pool...", size: @resources.size)
  
  # Enumerate all existing resources and retire them:
  while resource = acquire_existing_resource
    retire(resource)
  end
end

#empty?Boolean

Whether the pool is empty.

Returns:

  • (Boolean)


149
150
151
# File 'lib/async/pool/controller.rb', line 149

def empty?
  @resources.empty?
end

#prune(retain = 0) ⇒ Object

Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/async/pool/controller.rb', line 201

def prune(retain = 0)
  unused = []
  
  # This code must not context switch:
  @resources.each do |resource, usage|
    if usage.zero?
      unused << resource
    end
  end
  
  # It's okay for this to context switch:
  unused.each do |resource|
    if block_given?
      yield resource
    else
      retire(resource)
    end
    
    break if @resources.size <= retain
  end
  
  # Update availability list:
  @available.clear
  @resources.each do |resource, usage|
    if usage < resource.concurrency and resource.reusable?
      @available << resource
    end
  end
  
  return unused.size
end

#release(resource) ⇒ Object

Make the resource resources and let waiting tasks know that there is something resources.



167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/async/pool/controller.rb', line 167

def release(resource)
  processed = false
  
  # A resource that is not good should also not be reusable.
  if resource.reusable?
    processed = reuse(resource)
  end
  
  # @policy.released(self, resource)
ensure
  retire(resource) unless processed
end

#retire(resource) ⇒ Object

Retire a specific resource.



234
235
236
237
238
239
240
241
242
243
244
# File 'lib/async/pool/controller.rb', line 234

def retire(resource)
  Console.debug(self){"Retire #{resource}"}
  
  @resources.delete(resource)
  
  resource.close
  
  @mutex.synchronize{@condition.broadcast}
  
  return true
end

#sizeObject

The number of resources in the pool.



105
106
107
# File 'lib/async/pool/controller.rb', line 105

def size
  @resources.size
end

#The pool policy.=(poolpolicy. = (value)) ⇒ Object



96
# File 'lib/async/pool/controller.rb', line 96

attr_accessor :policy

#to_jsonObject

Generate a JSON representation of the pool.



81
82
83
# File 'lib/async/pool/controller.rb', line 81

def to_json(...)
  as_json.to_json(...)
end

#to_sObject

Generate a human-readable representation of the pool.



62
63
64
65
66
67
68
# File 'lib/async/pool/controller.rb', line 62

def to_s
  if @resources.empty?
    "\#<#{self.class}(#{usage_string})>"
  else
    "\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
  end
end

#waitObject

Deprecated.

Use #wait_until_free instead.

Wait until a pool resource has been freed.



130
131
132
133
134
# File 'lib/async/pool/controller.rb', line 130

def wait
  @mutex.synchronize do
    @condition.wait(@mutex)
  end
end

#wait_until_freeObject

Wait until the pool is not busy (no resources in use).



137
138
139
140
141
142
143
144
145
146
# File 'lib/async/pool/controller.rb', line 137

def wait_until_free
  @mutex.synchronize do
    if busy?
      yield self if block_given?
      
      # Wait until the pool is not busy:
      @condition.wait(@mutex) while busy?
    end
  end
end