Class: Concurrent::Agent

Inherits:
Object
  • Object
show all
Includes:
Dereferenceable, UsesGlobalThreadPool
Defined in:
lib/concurrent/agent.rb

Overview

An agent is a single atomic value that represents an identity. The current value of the agent can be requested at any time (#deref). Each agent has a work queue and operates on the global thread pool. Consumers can #post code blocks to the agent. The code block (function) will receive the current value of the agent as its sole parameter. The return value of the block will become the new value of the agent. Agents support two error handling modes: fail and continue. A good example of an agent is a shared incrementing counter, such as the score in a video game.

Examples:

Basic usage

score = Concurrent::Agent.new(10)
score.value #=> 10

score << proc{|current| current + 100 }
sleep(0.1)
score.value #=> 110

score << proc{|current| current * 2 }
sleep(0.1)
score.value #=> 220

score << proc{|current| current - 50 }
sleep(0.1)
score.value #=> 170

Constant Summary collapse

TIMEOUT =

The default timeout value (in seconds); used when no timeout option is given at initialization

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from UsesGlobalThreadPool

included

Methods included from Dereferenceable

#init_mutex, #mutex, #set_deref_options, #value

Constructor Details

#initialize(initial, opts = {}) ⇒ Agent

Initialize a new Agent with the given initial value and provided options.

Parameters:

  • the initial value

  • (defaults to: {})

    the options used to define the behavior at update and deref

Options Hash (opts):

  • :timeout (Fixnum) — default: TIMEOUT

    maximum number of seconds before an update is cancelled

  • :dup_on_deref (String) — default: false

    call #dup before returning the data

  • :freeze_on_deref (String) — default: false

    call #freeze before returning the data

  • :copy_on_deref (String) — default: nil

    call the given Proc passing the internal value and returning the value returned from the proc



54
55
56
57
58
59
60
61
62
# File 'lib/concurrent/agent.rb', line 54

def initialize(initial, opts = {})
  @value = initial
  @rescuers = []
  @validator = Proc.new { |result| true }
  @timeout = opts.fetch(:timeout, TIMEOUT).freeze
  @observers = CopyOnWriteObserverSet.new
  init_mutex
  set_deref_options(opts)
end

Instance Attribute Details

#timeoutFixnum (readonly)

Returns the maximum number of seconds before an update is cancelled.

Returns:

  • the maximum number of seconds before an update is cancelled



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/concurrent/agent.rb', line 35

class Agent
  include Dereferenceable
  include UsesGlobalThreadPool

  # The default timeout value (in seconds); used when no timeout option
  # is given at initialization
  TIMEOUT = 5

  attr_reader :timeout

  # Initialize a new Agent with the given initial value and provided options.
  #
  # @param [Object] initial the initial value
  # @param [Hash] opts the options used to define the behavior at update and deref
  # @option opts [Fixnum] :timeout (TIMEOUT) maximum number of seconds before an update is cancelled
  # @option opts [String] :dup_on_deref (false) call +#dup+ before returning the data
  # @option opts [String] :freeze_on_deref (false) call +#freeze+ before returning the data
  # @option opts [String] :copy_on_deref (nil) call the given +Proc+ passing the internal value and
  #   returning the value returned from the proc
  def initialize(initial, opts = {})
    @value = initial
    @rescuers = []
    @validator = Proc.new { |result| true }
    @timeout = opts.fetch(:timeout, TIMEOUT).freeze
    @observers = CopyOnWriteObserverSet.new
    init_mutex
    set_deref_options(opts)
  end

  # Specifies a block operation to be performed when an update operation raises
  # an exception. Rescue blocks will be checked in order they were added. The first
  # block for which the raised exception "is-a" subclass of the given +clazz+ will
  # be called. If no +clazz+ is given the block will match any caught exception.
  # This behavior is intended to be identical to Ruby's +begin/rescue/end+ behavior.
  # Any number of rescue handlers can be added. If no rescue handlers are added then
  # caught exceptions will be suppressed.
  #
  # @param [Exception] clazz the class of exception to catch
  # @yield the block to be called when a matching exception is caught
  # @yieldparam [StandardError] ex the caught exception
  #
  # @example
  #   score = Concurrent::Agent.new(0).
  #             rescue(NoMethodError){|ex| puts "Bam!" }.
  #             rescue(ArgumentError){|ex| puts "Pow!" }.
  #             rescue{|ex| puts "Boom!" }
  #   
  #   score << proc{|current| raise ArgumentError }
  #   sleep(0.1)
  #   #=> puts "Pow!"
  def rescue(clazz = StandardError, &block)
    unless block.nil?
      mutex.synchronize do
        @rescuers << Rescuer.new(clazz, block)
      end
    end
    self
  end
  alias_method :catch, :rescue
  alias_method :on_error, :rescue

  # A block operation to be performed after every update to validate if the new
  # value is valid. If the new value is not valid then the current value is not
  # updated. If no validator is provided then all updates are considered valid.
  #
  # @yield the block to be called after every update operation to determine if
  #   the result is valid
  # @yieldparam [Object] value the result of the last update operation
  # @yieldreturn [Boolean] true if the value is valid else false
  def validate(&block)
    @validator = block unless block.nil?
    self
  end
  alias_method :validates, :validate
  alias_method :validate_with, :validate
  alias_method :validates_with, :validate

  # Update the current value with the result of the given block operation
  #
  # @yield the operation to be performed with the current value in order to calculate
  #   the new value
  # @yieldparam [Object] value the current value
  # @yieldreturn [Object] the new value
  def post(&block)
    Agent.thread_pool.post{ work(&block) } unless block.nil?
  end

  # Update the current value with the result of the given block operation
  #
  # @yield the operation to be performed with the current value in order to calculate
  #   the new value
  # @yieldparam [Object] value the current value
  # @yieldreturn [Object] the new value
  def <<(block)
    self.post(&block)
    self
  end

  def add_observer(observer, func=:update)
    @observers.add_observer(observer, func)
  end

  alias_method :add_watch, :add_observer

  def delete_observer(observer)
    @observers.delete_observer(observer)
  end

  private

  # @!visibility private
  Rescuer = Struct.new(:clazz, :block) # :nodoc:

  # @!visibility private
  def try_rescue(ex) # :nodoc:
    rescuer = mutex.synchronize do
      @rescuers.find{|r| ex.is_a?(r.clazz) }
    end
    rescuer.block.call(ex) if rescuer
  rescue Exception => ex
    # supress
  end

  # @!visibility private
  def work(&handler) # :nodoc:
    begin

      should_notify = false

      mutex.synchronize do
        result = Concurrent::timeout(@timeout) do
          handler.call(@value)
        end
        if @validator.call(result)
          @value = result
          should_notify = true
        end
      end
      time = Time.now
      @observers.notify_observers{ [time, self.value] } if should_notify
    rescue Exception => ex
      try_rescue(ex)
    end
  end
end

Instance Method Details

#<<(block) {|value| ... } ⇒ Object

Update the current value with the result of the given block operation

Yields:

  • the operation to be performed with the current value in order to calculate the new value

Yield Parameters:

  • value (Object)

    the current value

Yield Returns:

  • (Object)

    the new value



128
129
130
131
# File 'lib/concurrent/agent.rb', line 128

def <<(block)
  self.post(&block)
  self
end

#add_observer(observer, func = :update) ⇒ Object Also known as: add_watch



133
134
135
# File 'lib/concurrent/agent.rb', line 133

def add_observer(observer, func=:update)
  @observers.add_observer(observer, func)
end

#delete_observer(observer) ⇒ Object



139
140
141
# File 'lib/concurrent/agent.rb', line 139

def delete_observer(observer)
  @observers.delete_observer(observer)
end

#post {|value| ... } ⇒ Object

Update the current value with the result of the given block operation

Yields:

  • the operation to be performed with the current value in order to calculate the new value

Yield Parameters:

  • value (Object)

    the current value

Yield Returns:

  • (Object)

    the new value



118
119
120
# File 'lib/concurrent/agent.rb', line 118

def post(&block)
  Agent.thread_pool.post{ work(&block) } unless block.nil?
end

#rescue(clazz = StandardError) {|ex| ... } ⇒ Object Also known as: catch, on_error

Specifies a block operation to be performed when an update operation raises an exception. Rescue blocks will be checked in order they were added. The first block for which the raised exception “is-a” subclass of the given clazz will be called. If no clazz is given the block will match any caught exception. This behavior is intended to be identical to Ruby’s begin/rescue/end behavior. Any number of rescue handlers can be added. If no rescue handlers are added then caught exceptions will be suppressed.

Examples:

score = Concurrent::Agent.new(0).
          rescue(NoMethodError){|ex| puts "Bam!" }.
          rescue(ArgumentError){|ex| puts "Pow!" }.
          rescue{|ex| puts "Boom!" }

score << proc{|current| raise ArgumentError }
sleep(0.1)
#=> puts "Pow!"

Parameters:

  • (defaults to: StandardError)

    the class of exception to catch

Yields:

  • the block to be called when a matching exception is caught

Yield Parameters:

  • ex (StandardError)

    the caught exception



85
86
87
88
89
90
91
92
# File 'lib/concurrent/agent.rb', line 85

def rescue(clazz = StandardError, &block)
  unless block.nil?
    mutex.synchronize do
      @rescuers << Rescuer.new(clazz, block)
    end
  end
  self
end

#validate {|value| ... } ⇒ Object Also known as: validates, validate_with, validates_with

A block operation to be performed after every update to validate if the new value is valid. If the new value is not valid then the current value is not updated. If no validator is provided then all updates are considered valid.

Yields:

  • the block to be called after every update operation to determine if the result is valid

Yield Parameters:

  • value (Object)

    the result of the last update operation

Yield Returns:

  • (Boolean)

    true if the value is valid else false



104
105
106
107
# File 'lib/concurrent/agent.rb', line 104

def validate(&block)
  @validator = block unless block.nil?
  self
end