Class: Concurrent::Agent
- Inherits:
-
Object
- Object
- Concurrent::Agent
- Includes:
- Concern::Deprecation, Concern::Dereferenceable, Concern::Logging, Concern::Observable
- Defined in:
- lib/concurrent/agent.rb
Overview
‘Agent`s are inspired by [Clojure’s](clojure.org/) [agent](clojure.org/agents) function. An ‘Agent` is a single atomic value that represents an identity. The current value of the `Agent` can be requested at any time (`deref`). Each `Agent` has a work queue and operates on the global thread pool (see below). Consumers can `post` code blocks to the `Agent`. The code block (function) will receive the current value of the `Agent` as its sole parameter. The return value of the block will become the new value of the `Agent`. `Agent`s support two error handling modes: fail and continue. A good example of an `Agent` is a shared incrementing counter, such as the score in a video game.
An ‘Agent` must be initialize with an initial value. This value is always accessible via the `value` (or `deref`) methods. Code blocks sent to the `Agent` will be processed in the order received. As each block is processed the current value is updated with the result from the block. This update is an atomic operation so a `deref` will never block and will always return the current value.
When an ‘Agent` is created it may be given an optional `validate` block and zero or more `rescue` blocks. When a new value is calculated the value will be checked against the validator, if present. If the validator returns `true` the new value will be accepted. If it returns `false` it will be rejected. If a block raises an exception during execution the list of `rescue` blocks will be seacrhed in order until one matching the current exception is found. That `rescue` block will then be called an passed the exception object. If no matching `rescue` block is found, or none were configured, then the exception will be suppressed.
‘Agent`s also implement Ruby’s [Observable](ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html). Code that observes an ‘Agent` will receive a callback with the new value any time the value is changed.
Instance Attribute Summary collapse
-
#fast_executor ⇒ Object
readonly
Returns the value of attribute fast_executor.
-
#io_executor ⇒ Object
readonly
Returns the value of attribute io_executor.
-
#timeout ⇒ Fixnum
readonly
The maximum number of seconds before an update is cancelled.
Instance Method Summary collapse
-
#<<(block) {|value| ... } ⇒ Object
Update the current value with the result of the given block fast, block should not do blocking calls, use #post_off for blocking calls.
-
#await(timeout = nil) ⇒ Boolean
Waits/blocks until all the updates sent before this call are done.
-
#initialize(initial, opts = {}) ⇒ Agent
constructor
Initialize a new Agent with the given initial value and provided options.
-
#post {|value| ... } ⇒ true?
Update the current value with the result of the given block fast, block should not do blocking calls, use #post_off for blocking calls.
-
#post_off(timeout = nil) {|value| ... } ⇒ true?
Update the current value with the result of the given block fast, block can do blocking calls.
-
#rescue(clazz = StandardError) {|ex| ... } ⇒ Object
(also: #catch, #on_error)
Specifies a block fast to be performed when an update fast raises an exception.
-
#validate {|value| ... } ⇒ Object
(also: #validates, #validate_with, #validates_with)
A block fast to be performed after every update to validate if the new value is valid.
Constructor Details
#initialize(initial, opts = {}) ⇒ Agent
Initialize a new Agent with the given initial value and provided options.
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/concurrent/agent.rb', line 97 def initialize(initial, opts = {}) @value = initial @rescuers = [] @validator = Proc.new { |result| true } self.observers = Collection::CopyOnWriteObserverSet.new @serialized_execution = SerializedExecution.new @io_executor = Executor.(opts) || Concurrent.global_io_executor @fast_executor = Executor.(opts) || Concurrent.global_fast_executor init_mutex (opts) end |
Instance Attribute Details
#fast_executor ⇒ Object (readonly)
Returns the value of attribute fast_executor.
90 91 92 |
# File 'lib/concurrent/agent.rb', line 90 def fast_executor @fast_executor end |
#io_executor ⇒ Object (readonly)
Returns the value of attribute io_executor.
90 91 92 |
# File 'lib/concurrent/agent.rb', line 90 def io_executor @io_executor end |
#timeout ⇒ Fixnum (readonly)
Returns the maximum number of seconds before an update is cancelled.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/concurrent/agent.rb', line 84 class Agent include Concern::Dereferenceable include Concern::Observable include Concern::Logging include Concern::Deprecation attr_reader :timeout, :io_executor, :fast_executor # Initialize a new Agent with the given initial value and provided options. # # @param [Object] initial the initial value # # @!macro executor_and_deref_options def initialize(initial, opts = {}) @value = initial @rescuers = [] @validator = Proc.new { |result| true } self.observers = Collection::CopyOnWriteObserverSet.new @serialized_execution = SerializedExecution.new @io_executor = Executor.(opts) || Concurrent.global_io_executor @fast_executor = Executor.(opts) || Concurrent.global_fast_executor init_mutex (opts) end # Specifies a block fast to be performed when an update fast raises # an exception. Rescue blocks will be checked in order they were added. The first # block for which the raised exception "is-a" subclass of the given `clazz` will # be called. If no `clazz` is given the block will match any caught exception. # This behavior is intended to be identical to Ruby's `begin/rescue/end` behavior. # Any number of rescue handlers can be added. If no rescue handlers are added then # caught exceptions will be suppressed. # # @param [Exception] clazz the class of exception to catch # @yield the block to be called when a matching exception is caught # @yieldparam [StandardError] ex the caught exception # # @example # score = Concurrent::Agent.new(0). # rescue(NoMethodError){|ex| puts "Bam!" }. # rescue(ArgumentError){|ex| puts "Pow!" }. # rescue{|ex| puts "Boom!" } # # score << proc{|current| raise ArgumentError } # sleep(0.1) # #=> puts "Pow!" def rescue(clazz = StandardError, &block) unless block.nil? mutex.synchronize do @rescuers << Rescuer.new(clazz, block) end end self end alias_method :catch, :rescue alias_method :on_error, :rescue # A block fast to be performed after every update to validate if the new # value is valid. If the new value is not valid then the current value is not # updated. If no validator is provided then all updates are considered valid. # # @yield the block to be called after every update fast to determine if # the result is valid # @yieldparam [Object] value the result of the last update fast # @yieldreturn [Boolean] true if the value is valid else false def validate(&block) unless block.nil? begin mutex.lock @validator = block ensure mutex.unlock end end self end alias_method :validates, :validate alias_method :validate_with, :validate alias_method :validates_with, :validate # Update the current value with the result of the given block fast, # block should not do blocking calls, use #post_off for blocking calls # # @yield the fast to be performed with the current value in order to calculate # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value # @return [true, nil] nil when no block is given def post(&block) post_on(@fast_executor, &block) end # Update the current value with the result of the given block fast, # block can do blocking calls # # @param [Fixnum, nil] timeout [DEPRECATED] maximum number of seconds before an update is cancelled # # @yield the fast to be performed with the current value in order to calculate # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value # @return [true, nil] nil when no block is given def post_off(timeout = nil, &block) task = if timeout deprecated 'post_off with option timeout options is deprecated and will be removed' lambda do |value| future = Future.execute do block.call(value) end if future.wait(timeout) future.value! else raise Concurrent::TimeoutError end end else block end post_on(@io_executor, &task) end # Update the current value with the result of the given block fast, # block should not do blocking calls, use #post_off for blocking calls # # @yield the fast to be performed with the current value in order to calculate # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value def <<(block) post(&block) self end # Waits/blocks until all the updates sent before this call are done. # # @param [Numeric] timeout the maximum time in second to wait. # @return [Boolean] false on timeout, true otherwise def await(timeout = nil) done = Event.new post { |val| done.set; val } done.wait timeout end private def post_on(executor, &block) return nil if block.nil? @serialized_execution.post(executor) { work(&block) } true end # @!visibility private Rescuer = Struct.new(:clazz, :block) # :nodoc: # @!visibility private def try_rescue(ex) # :nodoc: rescuer = mutex.synchronize do @rescuers.find { |r| ex.is_a?(r.clazz) } end rescuer.block.call(ex) if rescuer rescue Exception => ex # suppress log DEBUG, ex end # @!visibility private def work(&handler) # :nodoc: validator, value = mutex.synchronize { [@validator, @value] } begin result = handler.call(value) valid = validator.call(result) rescue Exception => ex exception = ex end begin mutex.lock should_notify = if !exception && valid @value = result true end ensure mutex.unlock end if should_notify time = Time.now observers.notify_observers { [time, self.value] } end try_rescue(exception) end end |
Instance Method Details
#<<(block) {|value| ... } ⇒ Object
Update the current value with the result of the given block fast, block should not do blocking calls, use #post_off for blocking calls
215 216 217 218 |
# File 'lib/concurrent/agent.rb', line 215 def <<(block) post(&block) self end |
#await(timeout = nil) ⇒ Boolean
Waits/blocks until all the updates sent before this call are done.
224 225 226 227 228 |
# File 'lib/concurrent/agent.rb', line 224 def await(timeout = nil) done = Event.new post { |val| done.set; val } done.wait timeout end |
#post {|value| ... } ⇒ true?
Update the current value with the result of the given block fast, block should not do blocking calls, use #post_off for blocking calls
175 176 177 |
# File 'lib/concurrent/agent.rb', line 175 def post(&block) post_on(@fast_executor, &block) end |
#post_off(timeout = nil) {|value| ... } ⇒ true?
Update the current value with the result of the given block fast, block can do blocking calls
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/concurrent/agent.rb', line 189 def post_off(timeout = nil, &block) task = if timeout deprecated 'post_off with option timeout options is deprecated and will be removed' lambda do |value| future = Future.execute do block.call(value) end if future.wait(timeout) future.value! else raise Concurrent::TimeoutError end end else block end post_on(@io_executor, &task) end |
#rescue(clazz = StandardError) {|ex| ... } ⇒ Object Also known as: catch, on_error
Specifies a block fast to be performed when an update fast raises an exception. Rescue blocks will be checked in order they were added. The first block for which the raised exception “is-a” subclass of the given ‘clazz` will be called. If no `clazz` is given the block will match any caught exception. This behavior is intended to be identical to Ruby’s ‘begin/rescue/end` behavior. Any number of rescue handlers can be added. If no rescue handlers are added then caught exceptions will be suppressed.
130 131 132 133 134 135 136 137 |
# File 'lib/concurrent/agent.rb', line 130 def rescue(clazz = StandardError, &block) unless block.nil? mutex.synchronize do @rescuers << Rescuer.new(clazz, block) end end self end |
#validate {|value| ... } ⇒ Object Also known as: validates, validate_with, validates_with
A block fast to be performed after every update to validate if the new value is valid. If the new value is not valid then the current value is not updated. If no validator is provided then all updates are considered valid.
150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/concurrent/agent.rb', line 150 def validate(&block) unless block.nil? begin mutex.lock @validator = block ensure mutex.unlock end end self end |