Class: Concurrent::Agent
- Inherits:
-
Object
- Object
- Concurrent::Agent
- Includes:
- Dereferenceable, UsesGlobalThreadPool
- Defined in:
- lib/concurrent/agent.rb
Overview
An agent is a single atomic value that represents an identity. The current value of the agent can be requested at any time (#deref). Each agent has a work queue and operates on the global thread pool. Consumers can #post code blocks to the agent. The code block (function) will receive the current value of the agent as its sole parameter. The return value of the block will become the new value of the agent. Agents support two error handling modes: fail and continue. A good example of an agent is a shared incrementing counter, such as the score in a video game.
Constant Summary collapse
- TIMEOUT =
The default timeout value (in seconds); used when no timeout option is given at initialization
5
Instance Attribute Summary collapse
-
#timeout ⇒ Fixnum
readonly
The maximum number of seconds before an update is cancelled.
Instance Method Summary collapse
-
#<<(block) {|value| ... } ⇒ Object
Update the current value with the result of the given block operation.
- #add_observer(observer, func = :update) ⇒ Object (also: #add_watch)
- #delete_observer(observer) ⇒ Object
-
#initialize(initial, opts = {}) ⇒ Agent
constructor
Initialize a new Agent with the given initial value and provided options.
-
#post {|value| ... } ⇒ Object
Update the current value with the result of the given block operation.
-
#rescue(clazz = StandardError) {|ex| ... } ⇒ Object
(also: #catch, #on_error)
Specifies a block operation to be performed when an update operation raises an exception.
-
#validate {|value| ... } ⇒ Object
(also: #validates, #validate_with, #validates_with)
A block operation to be performed after every update to validate if the new value is valid.
Methods included from UsesGlobalThreadPool
Methods included from Dereferenceable
#init_mutex, #mutex, #set_deref_options, #value
Constructor Details
#initialize(initial, opts = {}) ⇒ Agent
Initialize a new Agent with the given initial value and provided options.
54 55 56 57 58 59 60 61 62 |
# File 'lib/concurrent/agent.rb', line 54 def initialize(initial, opts = {}) @value = initial @rescuers = [] @validator = Proc.new { |result| true } @timeout = opts.fetch(:timeout, TIMEOUT).freeze @observers = CopyOnWriteObserverSet.new init_mutex (opts) end |
Instance Attribute Details
#timeout ⇒ Fixnum (readonly)
Returns the maximum number of seconds before an update is cancelled.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/concurrent/agent.rb', line 35 class Agent include Dereferenceable include UsesGlobalThreadPool # The default timeout value (in seconds); used when no timeout option # is given at initialization TIMEOUT = 5 attr_reader :timeout # Initialize a new Agent with the given initial value and provided options. # # @param [Object] initial the initial value # @param [Hash] opts the options used to define the behavior at update and deref # @option opts [Fixnum] :timeout (TIMEOUT) maximum number of seconds before an update is cancelled # @option opts [String] :dup_on_deref (false) call +#dup+ before returning the data # @option opts [String] :freeze_on_deref (false) call +#freeze+ before returning the data # @option opts [String] :copy_on_deref (nil) call the given +Proc+ passing the internal value and # returning the value returned from the proc def initialize(initial, opts = {}) @value = initial @rescuers = [] @validator = Proc.new { |result| true } @timeout = opts.fetch(:timeout, TIMEOUT).freeze @observers = CopyOnWriteObserverSet.new init_mutex (opts) end # Specifies a block operation to be performed when an update operation raises # an exception. Rescue blocks will be checked in order they were added. The first # block for which the raised exception "is-a" subclass of the given +clazz+ will # be called. If no +clazz+ is given the block will match any caught exception. # This behavior is intended to be identical to Ruby's +begin/rescue/end+ behavior. # Any number of rescue handlers can be added. If no rescue handlers are added then # caught exceptions will be suppressed. # # @param [Exception] clazz the class of exception to catch # @yield the block to be called when a matching exception is caught # @yieldparam [StandardError] ex the caught exception # # @example # score = Concurrent::Agent.new(0). # rescue(NoMethodError){|ex| puts "Bam!" }. # rescue(ArgumentError){|ex| puts "Pow!" }. # rescue{|ex| puts "Boom!" } # # score << proc{|current| raise ArgumentError } # sleep(0.1) # #=> puts "Pow!" def rescue(clazz = StandardError, &block) unless block.nil? mutex.synchronize do @rescuers << Rescuer.new(clazz, block) end end self end alias_method :catch, :rescue alias_method :on_error, :rescue # A block operation to be performed after every update to validate if the new # value is valid. If the new value is not valid then the current value is not # updated. If no validator is provided then all updates are considered valid. # # @yield the block to be called after every update operation to determine if # the result is valid # @yieldparam [Object] value the result of the last update operation # @yieldreturn [Boolean] true if the value is valid else false def validate(&block) @validator = block unless block.nil? self end alias_method :validates, :validate alias_method :validate_with, :validate alias_method :validates_with, :validate # Update the current value with the result of the given block operation # # @yield the operation to be performed with the current value in order to calculate # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value def post(&block) Agent.thread_pool.post{ work(&block) } unless block.nil? end # Update the current value with the result of the given block operation # # @yield the operation to be performed with the current value in order to calculate # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value def <<(block) self.post(&block) self end def add_observer(observer, func=:update) @observers.add_observer(observer, func) end alias_method :add_watch, :add_observer def delete_observer(observer) @observers.delete_observer(observer) end private # @!visibility private Rescuer = Struct.new(:clazz, :block) # :nodoc: # @!visibility private def try_rescue(ex) # :nodoc: rescuer = mutex.synchronize do @rescuers.find{|r| ex.is_a?(r.clazz) } end rescuer.block.call(ex) if rescuer rescue Exception => ex # supress end # @!visibility private def work(&handler) # :nodoc: begin should_notify = false mutex.synchronize do result = Concurrent::timeout(@timeout) do handler.call(@value) end if @validator.call(result) @value = result should_notify = true end end time = Time.now @observers.notify_observers{ [time, self.value] } if should_notify rescue Exception => ex try_rescue(ex) end end end |
Instance Method Details
#<<(block) {|value| ... } ⇒ Object
Update the current value with the result of the given block operation
128 129 130 131 |
# File 'lib/concurrent/agent.rb', line 128 def <<(block) self.post(&block) self end |
#add_observer(observer, func = :update) ⇒ Object Also known as: add_watch
133 134 135 |
# File 'lib/concurrent/agent.rb', line 133 def add_observer(observer, func=:update) @observers.add_observer(observer, func) end |
#delete_observer(observer) ⇒ Object
139 140 141 |
# File 'lib/concurrent/agent.rb', line 139 def delete_observer(observer) @observers.delete_observer(observer) end |
#post {|value| ... } ⇒ Object
Update the current value with the result of the given block operation
118 119 120 |
# File 'lib/concurrent/agent.rb', line 118 def post(&block) Agent.thread_pool.post{ work(&block) } unless block.nil? end |
#rescue(clazz = StandardError) {|ex| ... } ⇒ Object Also known as: catch, on_error
Specifies a block operation to be performed when an update operation raises an exception. Rescue blocks will be checked in order they were added. The first block for which the raised exception “is-a” subclass of the given clazz will be called. If no clazz is given the block will match any caught exception. This behavior is intended to be identical to Ruby’s begin/rescue/end behavior. Any number of rescue handlers can be added. If no rescue handlers are added then caught exceptions will be suppressed.
85 86 87 88 89 90 91 92 |
# File 'lib/concurrent/agent.rb', line 85 def rescue(clazz = StandardError, &block) unless block.nil? mutex.synchronize do @rescuers << Rescuer.new(clazz, block) end end self end |
#validate {|value| ... } ⇒ Object Also known as: validates, validate_with, validates_with
A block operation to be performed after every update to validate if the new value is valid. If the new value is not valid then the current value is not updated. If no validator is provided then all updates are considered valid.
104 105 106 107 |
# File 'lib/concurrent/agent.rb', line 104 def validate(&block) @validator = block unless block.nil? self end |