Class: Concurrent::Agent

Inherits:
Object
  • Object
show all
Includes:
Dereferenceable, Logging, Observable
Defined in:
lib/concurrent/agent.rb

Overview

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log

Methods included from Observable

#add_observer, #count_observers, #delete_observer, #delete_observers, #with_observer

Methods included from Dereferenceable

#value

Constructor Details

#initialize(initial, opts = {}) ⇒ Agent

Initialize a new Agent with the given initial value and provided options.

Parameters:

  • initial (Object)

    the initial value

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref

Options Hash (opts):

  • :operation (Boolean) — default: false

    when ‘true` will execute the future on the global operation pool (for long-running operations), when `false` will execute the future on the global task pool (for short-running tasks)

  • :executor (object)

    when provided will run all operations on this executor rather than the global thread pool (overrides :operation)

  • :dup_on_deref (String) — default: false

    call ‘#dup` before returning the data

  • :freeze_on_deref (String) — default: false

    call ‘#freeze` before returning the data

  • :copy_on_deref (String) — default: nil

    call the given ‘Proc` passing the internal value and returning the value returned from the proc



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/concurrent/agent.rb', line 37

def initialize(initial, opts = {})
  @value                = initial
  @rescuers             = []
  @validator            = Proc.new { |result| true }
  self.observers        = CopyOnWriteObserverSet.new
  @serialized_execution = SerializedExecution.new
  @task_executor        = OptionsParser.get_task_executor_from(opts)
  @operation_executor   = OptionsParser.get_operation_executor_from(opts)
  init_mutex
  set_deref_options(opts)
end

Instance Attribute Details

#operation_executorObject (readonly)

Returns the value of attribute operation_executor.



20
21
22
# File 'lib/concurrent/agent.rb', line 20

def operation_executor
  @operation_executor
end

#task_executorObject (readonly)

Returns the value of attribute task_executor.



20
21
22
# File 'lib/concurrent/agent.rb', line 20

def task_executor
  @task_executor
end

#timeoutFixnum (readonly)

Returns the maximum number of seconds before an update is cancelled.

Returns:

  • (Fixnum)

    the maximum number of seconds before an update is cancelled



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/concurrent/agent.rb', line 15

class Agent
  include Dereferenceable
  include Concurrent::Observable
  include Logging

  attr_reader :timeout, :task_executor, :operation_executor

  # Initialize a new Agent with the given initial value and provided options.
  #
  # @param [Object] initial the initial value
  # @param [Hash] opts the options used to define the behavior at update and deref
  #
  # @option opts [Boolean] :operation (false) when `true` will execute the future on the global
  #   operation pool (for long-running operations), when `false` will execute the future on the
  #   global task pool (for short-running tasks)
  # @option opts [object] :executor when provided will run all operations on
  #   this executor rather than the global thread pool (overrides :operation)
  #
  # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data
  # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data
  # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
  #   returning the value returned from the proc
  def initialize(initial, opts = {})
    @value                = initial
    @rescuers             = []
    @validator            = Proc.new { |result| true }
    self.observers        = CopyOnWriteObserverSet.new
    @serialized_execution = SerializedExecution.new
    @task_executor        = OptionsParser.get_task_executor_from(opts)
    @operation_executor   = OptionsParser.get_operation_executor_from(opts)
    init_mutex
    set_deref_options(opts)
  end

  # Specifies a block operation to be performed when an update operation raises
  # an exception. Rescue blocks will be checked in order they were added. The first
  # block for which the raised exception "is-a" subclass of the given `clazz` will
  # be called. If no `clazz` is given the block will match any caught exception.
  # This behavior is intended to be identical to Ruby's `begin/rescue/end` behavior.
  # Any number of rescue handlers can be added. If no rescue handlers are added then
  # caught exceptions will be suppressed.
  #
  # @param [Exception] clazz the class of exception to catch
  # @yield the block to be called when a matching exception is caught
  # @yieldparam [StandardError] ex the caught exception
  #
  # @example
  #   score = Concurrent::Agent.new(0).
  #             rescue(NoMethodError){|ex| puts "Bam!" }.
  #             rescue(ArgumentError){|ex| puts "Pow!" }.
  #             rescue{|ex| puts "Boom!" }
  #   
  #   score << proc{|current| raise ArgumentError }
  #   sleep(0.1)
  #   #=> puts "Pow!"
  def rescue(clazz = StandardError, &block)
    unless block.nil?
      mutex.synchronize do
        @rescuers << Rescuer.new(clazz, block)
      end
    end
    self
  end
  alias_method :catch, :rescue
  alias_method :on_error, :rescue

  # A block operation to be performed after every update to validate if the new
  # value is valid. If the new value is not valid then the current value is not
  # updated. If no validator is provided then all updates are considered valid.
  #
  # @yield the block to be called after every update operation to determine if
  #   the result is valid
  # @yieldparam [Object] value the result of the last update operation
  # @yieldreturn [Boolean] true if the value is valid else false
  def validate(&block)

    unless block.nil?
      begin
        mutex.lock
        @validator = block
      ensure
        mutex.unlock
      end
    end
    self
  end
  alias_method :validates, :validate
  alias_method :validate_with, :validate
  alias_method :validates_with, :validate

  # Update the current value with the result of the given block operation,
  # block should not do blocking calls, use #post_off for blocking calls
  #
  # @yield the operation to be performed with the current value in order to calculate
  #   the new value
  # @yieldparam [Object] value the current value
  # @yieldreturn [Object] the new value
  # @return [true, nil] nil when no block is given
  def post(&block)
    post_on(@task_executor, &block)
  end

  # Update the current value with the result of the given block operation,
  # block can do blocking calls
  #
  # @param [Fixnum, nil] timeout maximum number of seconds before an update is cancelled
  #
  # @yield the operation to be performed with the current value in order to calculate
  #   the new value
  # @yieldparam [Object] value the current value
  # @yieldreturn [Object] the new value
  # @return [true, nil] nil when no block is given
  def post_off(timeout = nil, &block)
    block = if timeout
              lambda { |value| Concurrent::timeout(timeout) { block.call(value) } }
            else
              block
            end
    post_on(@operation_executor, &block)
  end

  # Update the current value with the result of the given block operation,
  # block should not do blocking calls, use #post_off for blocking calls
  #
  # @yield the operation to be performed with the current value in order to calculate
  #   the new value
  # @yieldparam [Object] value the current value
  # @yieldreturn [Object] the new value
  def <<(block)
    post(&block)
    self
  end

  # Waits/blocks until all the updates sent before this call are done.
  #
  # @param [Numeric] timeout the maximum time in second to wait.
  # @return [Boolean] false on timeout, true otherwise
  def await(timeout = nil)
    done = Event.new
    post { |val| done.set; val }
    done.wait timeout
  end

  private

  def post_on(executor, &block)
    return nil if block.nil?
    @serialized_execution.post(executor) { work(&block) }
    true
  end

  # @!visibility private
  Rescuer = Struct.new(:clazz, :block) # :nodoc:

  # @!visibility private
  def try_rescue(ex) # :nodoc:
    rescuer = mutex.synchronize do
      @rescuers.find { |r| ex.is_a?(r.clazz) }
    end
    rescuer.block.call(ex) if rescuer
  rescue Exception => ex
    # suppress
    log DEBUG, ex
  end

  # @!visibility private
  def work(&handler) # :nodoc:
    validator, value = mutex.synchronize { [@validator, @value] }

    begin
      result = handler.call(value)
      valid  = validator.call(result)
    rescue Exception => ex
      exception = ex
    end

    begin
      mutex.lock
      should_notify = if !exception && valid
                        @value = result
                        true
                      end
    ensure
      mutex.unlock
    end

    if should_notify
      time = Time.now
      observers.notify_observers { [time, self.value] }
    end

    try_rescue(exception)
  end
end

Instance Method Details

#<<(block) {|value| ... } ⇒ Object

Update the current value with the result of the given block operation, block should not do blocking calls, use #post_off for blocking calls

Yields:

  • the operation to be performed with the current value in order to calculate the new value

Yield Parameters:

  • value (Object)

    the current value

Yield Returns:

  • (Object)

    the new value



143
144
145
146
# File 'lib/concurrent/agent.rb', line 143

def <<(block)
  post(&block)
  self
end

#await(timeout = nil) ⇒ Boolean

Waits/blocks until all the updates sent before this call are done.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Boolean)

    false on timeout, true otherwise



152
153
154
155
156
# File 'lib/concurrent/agent.rb', line 152

def await(timeout = nil)
  done = Event.new
  post { |val| done.set; val }
  done.wait timeout
end

#post {|value| ... } ⇒ true?

Update the current value with the result of the given block operation, block should not do blocking calls, use #post_off for blocking calls

Yields:

  • the operation to be performed with the current value in order to calculate the new value

Yield Parameters:

  • value (Object)

    the current value

Yield Returns:

  • (Object)

    the new value

Returns:

  • (true, nil)

    nil when no block is given



113
114
115
# File 'lib/concurrent/agent.rb', line 113

def post(&block)
  post_on(@task_executor, &block)
end

#post_off(timeout = nil) {|value| ... } ⇒ true?

Update the current value with the result of the given block operation, block can do blocking calls

Parameters:

  • timeout (Fixnum, nil) (defaults to: nil)

    maximum number of seconds before an update is cancelled

Yields:

  • the operation to be performed with the current value in order to calculate the new value

Yield Parameters:

  • value (Object)

    the current value

Yield Returns:

  • (Object)

    the new value

Returns:

  • (true, nil)

    nil when no block is given



127
128
129
130
131
132
133
134
# File 'lib/concurrent/agent.rb', line 127

def post_off(timeout = nil, &block)
  block = if timeout
            lambda { |value| Concurrent::timeout(timeout) { block.call(value) } }
          else
            block
          end
  post_on(@operation_executor, &block)
end

#rescue(clazz = StandardError) {|ex| ... } ⇒ Object Also known as: catch, on_error

Specifies a block operation to be performed when an update operation raises an exception. Rescue blocks will be checked in order they were added. The first block for which the raised exception “is-a” subclass of the given ‘clazz` will be called. If no `clazz` is given the block will match any caught exception. This behavior is intended to be identical to Ruby’s ‘begin/rescue/end` behavior. Any number of rescue handlers can be added. If no rescue handlers are added then caught exceptions will be suppressed.

Examples:

score = Concurrent::Agent.new(0).
          rescue(NoMethodError){|ex| puts "Bam!" }.
          rescue(ArgumentError){|ex| puts "Pow!" }.
          rescue{|ex| puts "Boom!" }

score << proc{|current| raise ArgumentError }
sleep(0.1)
#=> puts "Pow!"

Parameters:

  • clazz (Exception) (defaults to: StandardError)

    the class of exception to catch

Yields:

  • the block to be called when a matching exception is caught

Yield Parameters:

  • ex (StandardError)

    the caught exception



70
71
72
73
74
75
76
77
# File 'lib/concurrent/agent.rb', line 70

def rescue(clazz = StandardError, &block)
  unless block.nil?
    mutex.synchronize do
      @rescuers << Rescuer.new(clazz, block)
    end
  end
  self
end

#validate {|value| ... } ⇒ Object Also known as: validates, validate_with, validates_with

A block operation to be performed after every update to validate if the new value is valid. If the new value is not valid then the current value is not updated. If no validator is provided then all updates are considered valid.

Yields:

  • the block to be called after every update operation to determine if the result is valid

Yield Parameters:

  • value (Object)

    the result of the last update operation

Yield Returns:

  • (Boolean)

    true if the value is valid else false



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/concurrent/agent.rb', line 89

def validate(&block)

  unless block.nil?
    begin
      mutex.lock
      @validator = block
    ensure
      mutex.unlock
    end
  end
  self
end