Class: Concurrent::Agent

Inherits:
Object
  • Object
show all
Includes:
Concern::Deprecation, Concern::Dereferenceable, Concern::Logging, Concern::Observable
Defined in:
lib/concurrent/agent.rb

Overview

‘Agent`s are inspired by [Clojure’s](clojure.org/) [agent](clojure.org/agents) function. An ‘Agent` is a single atomic value that represents an identity. The current value of the `Agent` can be requested at any time (`deref`). Each `Agent` has a work queue and operates on the global thread pool (see below). Consumers can `post` code blocks to the `Agent`. The code block (function) will receive the current value of the `Agent` as its sole parameter. The return value of the block will become the new value of the `Agent`. `Agent`s support two error handling modes: fail and continue. A good example of an `Agent` is a shared incrementing counter, such as the score in a video game.

An ‘Agent` must be initialize with an initial value. This value is always accessible via the `value` (or `deref`) methods. Code blocks sent to the `Agent` will be processed in the order received. As each block is processed the current value is updated with the result from the block. This update is an atomic operation so a `deref` will never block and will always return the current value.

When an ‘Agent` is created it may be given an optional `validate` block and zero or more `rescue` blocks. When a new value is calculated the value will be checked against the validator, if present. If the validator returns `true` the new value will be accepted. If it returns `false` it will be rejected. If a block raises an exception during execution the list of `rescue` blocks will be seacrhed in order until one matching the current exception is found. That `rescue` block will then be called an passed the exception object. If no matching `rescue` block is found, or none were configured, then the exception will be suppressed.

‘Agent`s also implement Ruby’s [Observable](ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html). Code that observes an ‘Agent` will receive a callback with the new value any time the value is changed.

Examples:

Simple Example


require 'concurrent'

score = Concurrent::Agent.new(10)
score.value #=> 10

score << proc{|current| current + 100 }
sleep(0.1)
score.value #=> 110

score << proc{|current| current * 2 }
sleep(0.1)
score.value #=> 220

score << proc{|current| current - 50 }
sleep(0.1)
score.value #=> 170

With Validation and Error Handling


score = Concurrent::Agent.new(0).validate{|value| value <= 1024 }.
          rescue(NoMethodError){|ex| puts "Bam!" }.
          rescue(ArgumentError){|ex| puts "Pow!" }.
          rescue{|ex| puts "Boom!" }
score.value #=> 0

score << proc{|current| current + 2048 }
sleep(0.1)
score.value #=> 0

score << proc{|current| raise ArgumentError }
sleep(0.1)
#=> puts "Pow!"
score.value #=> 0

score << proc{|current| current + 100 }
sleep(0.1)
score.value #=> 100

With Observation


bingo = Class.new{
  def update(time, score)
    puts "Bingo! [score: #{score}, time: #{time}]" if score >= 100
  end
}.new

score = Concurrent::Agent.new(0)
score.add_observer(bingo)

score << proc{|current| sleep(0.1); current += 30 }
score << proc{|current| sleep(0.1); current += 30 }
score << proc{|current| sleep(0.1); current += 30 }
score << proc{|current| sleep(0.1); current += 30 }

sleep(1)
#=> Bingo! [score: 120, time: 2013-07-22 21:26:08 -0400]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(initial, opts = {}) ⇒ Agent

Initialize a new Agent with the given initial value and provided options.

Parameters:

  • initial (Object)

    the initial value



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/concurrent/agent.rb', line 97

def initialize(initial, opts = {})
  @value                = initial
  @rescuers             = []
  @validator            = Proc.new { |result| true }
  self.observers        = Collection::CopyOnWriteObserverSet.new
  @serialized_execution = SerializedExecution.new
  @io_executor          = Executor.executor_from_options(opts) || Concurrent.global_io_executor
  @fast_executor        = Executor.executor_from_options(opts) || Concurrent.global_fast_executor
  init_mutex
  set_deref_options(opts)
end

Instance Attribute Details

#fast_executorObject (readonly)

Returns the value of attribute fast_executor.



90
91
92
# File 'lib/concurrent/agent.rb', line 90

def fast_executor
  @fast_executor
end

#io_executorObject (readonly)

Returns the value of attribute io_executor.



90
91
92
# File 'lib/concurrent/agent.rb', line 90

def io_executor
  @io_executor
end

#timeoutFixnum (readonly)

Returns the maximum number of seconds before an update is cancelled.

Returns:

  • (Fixnum)

    the maximum number of seconds before an update is cancelled



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/concurrent/agent.rb', line 84

class Agent
  include Concern::Dereferenceable
  include Concern::Observable
  include Concern::Logging
  include Concern::Deprecation

  attr_reader :timeout, :io_executor, :fast_executor

  # Initialize a new Agent with the given initial value and provided options.
  #
  # @param [Object] initial the initial value
  #
  # @!macro executor_and_deref_options
  def initialize(initial, opts = {})
    @value                = initial
    @rescuers             = []
    @validator            = Proc.new { |result| true }
    self.observers        = Collection::CopyOnWriteObserverSet.new
    @serialized_execution = SerializedExecution.new
    @io_executor          = Executor.executor_from_options(opts) || Concurrent.global_io_executor
    @fast_executor        = Executor.executor_from_options(opts) || Concurrent.global_fast_executor
    init_mutex
    set_deref_options(opts)
  end

  # Specifies a block fast to be performed when an update fast raises
  # an exception. Rescue blocks will be checked in order they were added. The first
  # block for which the raised exception "is-a" subclass of the given `clazz` will
  # be called. If no `clazz` is given the block will match any caught exception.
  # This behavior is intended to be identical to Ruby's `begin/rescue/end` behavior.
  # Any number of rescue handlers can be added. If no rescue handlers are added then
  # caught exceptions will be suppressed.
  #
  # @param [Exception] clazz the class of exception to catch
  # @yield the block to be called when a matching exception is caught
  # @yieldparam [StandardError] ex the caught exception
  #
  # @example
  #   score = Concurrent::Agent.new(0).
  #             rescue(NoMethodError){|ex| puts "Bam!" }.
  #             rescue(ArgumentError){|ex| puts "Pow!" }.
  #             rescue{|ex| puts "Boom!" }
  #
  #   score << proc{|current| raise ArgumentError }
  #   sleep(0.1)
  #   #=> puts "Pow!"
  def rescue(clazz = StandardError, &block)
    unless block.nil?
      mutex.synchronize do
        @rescuers << Rescuer.new(clazz, block)
      end
    end
    self
  end

  alias_method :catch, :rescue
  alias_method :on_error, :rescue

  # A block fast to be performed after every update to validate if the new
  # value is valid. If the new value is not valid then the current value is not
  # updated. If no validator is provided then all updates are considered valid.
  #
  # @yield the block to be called after every update fast to determine if
  #   the result is valid
  # @yieldparam [Object] value the result of the last update fast
  # @yieldreturn [Boolean] true if the value is valid else false
  def validate(&block)

    unless block.nil?
      begin
        mutex.lock
        @validator = block
      ensure
        mutex.unlock
      end
    end
    self
  end

  alias_method :validates, :validate
  alias_method :validate_with, :validate
  alias_method :validates_with, :validate

  # Update the current value with the result of the given block fast,
  # block should not do blocking calls, use #post_off for blocking calls
  #
  # @yield the fast to be performed with the current value in order to calculate
  #   the new value
  # @yieldparam [Object] value the current value
  # @yieldreturn [Object] the new value
  # @return [true, nil] nil when no block is given
  def post(&block)
    post_on(@fast_executor, &block)
  end

  # Update the current value with the result of the given block fast,
  # block can do blocking calls
  #
  # @param [Fixnum, nil] timeout [DEPRECATED] maximum number of seconds before an update is cancelled
  #
  # @yield the fast to be performed with the current value in order to calculate
  #   the new value
  # @yieldparam [Object] value the current value
  # @yieldreturn [Object] the new value
  # @return [true, nil] nil when no block is given
  def post_off(timeout = nil, &block)
    task = if timeout
             deprecated 'post_off with option timeout options is deprecated and will be removed'
             lambda do |value|
               future = Future.execute do
                 block.call(value)
               end
               if future.wait(timeout)
                 future.value!
               else
                 raise Concurrent::TimeoutError
               end
             end
           else
             block
           end
    post_on(@io_executor, &task)
  end

  # Update the current value with the result of the given block fast,
  # block should not do blocking calls, use #post_off for blocking calls
  #
  # @yield the fast to be performed with the current value in order to calculate
  #   the new value
  # @yieldparam [Object] value the current value
  # @yieldreturn [Object] the new value
  def <<(block)
    post(&block)
    self
  end

  # Waits/blocks until all the updates sent before this call are done.
  #
  # @param [Numeric] timeout the maximum time in second to wait.
  # @return [Boolean] false on timeout, true otherwise
  def await(timeout = nil)
    done = Event.new
    post { |val| done.set; val }
    done.wait timeout
  end

  private

  def post_on(executor, &block)
    return nil if block.nil?
    @serialized_execution.post(executor) { work(&block) }
    true
  end

  # @!visibility private
  Rescuer = Struct.new(:clazz, :block) # :nodoc:

  # @!visibility private
  def try_rescue(ex) # :nodoc:
    rescuer = mutex.synchronize do
      @rescuers.find { |r| ex.is_a?(r.clazz) }
    end
    rescuer.block.call(ex) if rescuer
  rescue Exception => ex
    # suppress
    log DEBUG, ex
  end

  # @!visibility private
  def work(&handler) # :nodoc:
    validator, value = mutex.synchronize { [@validator, @value] }

    begin
      result = handler.call(value)
      valid  = validator.call(result)
    rescue Exception => ex
      exception = ex
    end

    begin
      mutex.lock
      should_notify = if !exception && valid
                        @value = result
                        true
                      end
    ensure
      mutex.unlock
    end

    if should_notify
      time = Time.now
      observers.notify_observers { [time, self.value] }
    end

    try_rescue(exception)
  end
end

Instance Method Details

#<<(block) {|value| ... } ⇒ Object

Update the current value with the result of the given block fast, block should not do blocking calls, use #post_off for blocking calls

Yields:

  • the fast to be performed with the current value in order to calculate the new value

Yield Parameters:

  • value (Object)

    the current value

Yield Returns:

  • (Object)

    the new value



215
216
217
218
# File 'lib/concurrent/agent.rb', line 215

def <<(block)
  post(&block)
  self
end

#await(timeout = nil) ⇒ Boolean

Waits/blocks until all the updates sent before this call are done.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Boolean)

    false on timeout, true otherwise



224
225
226
227
228
# File 'lib/concurrent/agent.rb', line 224

def await(timeout = nil)
  done = Event.new
  post { |val| done.set; val }
  done.wait timeout
end

#post {|value| ... } ⇒ true?

Update the current value with the result of the given block fast, block should not do blocking calls, use #post_off for blocking calls

Yields:

  • the fast to be performed with the current value in order to calculate the new value

Yield Parameters:

  • value (Object)

    the current value

Yield Returns:

  • (Object)

    the new value

Returns:

  • (true, nil)

    nil when no block is given



175
176
177
# File 'lib/concurrent/agent.rb', line 175

def post(&block)
  post_on(@fast_executor, &block)
end

#post_off(timeout = nil) {|value| ... } ⇒ true?

Update the current value with the result of the given block fast, block can do blocking calls

Parameters:

  • timeout (Fixnum, nil) (defaults to: nil)
    DEPRECATED

    maximum number of seconds before an update is cancelled

Yields:

  • the fast to be performed with the current value in order to calculate the new value

Yield Parameters:

  • value (Object)

    the current value

Yield Returns:

  • (Object)

    the new value

Returns:

  • (true, nil)

    nil when no block is given



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/concurrent/agent.rb', line 189

def post_off(timeout = nil, &block)
  task = if timeout
           deprecated 'post_off with option timeout options is deprecated and will be removed'
           lambda do |value|
             future = Future.execute do
               block.call(value)
             end
             if future.wait(timeout)
               future.value!
             else
               raise Concurrent::TimeoutError
             end
           end
         else
           block
         end
  post_on(@io_executor, &task)
end

#rescue(clazz = StandardError) {|ex| ... } ⇒ Object Also known as: catch, on_error

Specifies a block fast to be performed when an update fast raises an exception. Rescue blocks will be checked in order they were added. The first block for which the raised exception “is-a” subclass of the given ‘clazz` will be called. If no `clazz` is given the block will match any caught exception. This behavior is intended to be identical to Ruby’s ‘begin/rescue/end` behavior. Any number of rescue handlers can be added. If no rescue handlers are added then caught exceptions will be suppressed.

Examples:

score = Concurrent::Agent.new(0).
          rescue(NoMethodError){|ex| puts "Bam!" }.
          rescue(ArgumentError){|ex| puts "Pow!" }.
          rescue{|ex| puts "Boom!" }

score << proc{|current| raise ArgumentError }
sleep(0.1)
#=> puts "Pow!"

Parameters:

  • clazz (Exception) (defaults to: StandardError)

    the class of exception to catch

Yields:

  • the block to be called when a matching exception is caught

Yield Parameters:

  • ex (StandardError)

    the caught exception



130
131
132
133
134
135
136
137
# File 'lib/concurrent/agent.rb', line 130

def rescue(clazz = StandardError, &block)
  unless block.nil?
    mutex.synchronize do
      @rescuers << Rescuer.new(clazz, block)
    end
  end
  self
end

#validate {|value| ... } ⇒ Object Also known as: validates, validate_with, validates_with

A block fast to be performed after every update to validate if the new value is valid. If the new value is not valid then the current value is not updated. If no validator is provided then all updates are considered valid.

Yields:

  • the block to be called after every update fast to determine if the result is valid

Yield Parameters:

  • value (Object)

    the result of the last update fast

Yield Returns:

  • (Boolean)

    true if the value is valid else false



150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/concurrent/agent.rb', line 150

def validate(&block)

  unless block.nil?
    begin
      mutex.lock
      @validator = block
    ensure
      mutex.unlock
    end
  end
  self
end