Class: Roby::Promise
Overview
An extension to Concurrent::Promise that is aware of the mixed thread/event loop nature of Roby
Use ExecutionEngine#promise to create one
#on_success and #rescue gain an in_engine argument, which decides whether the given block should be executed by the underlying execution engine’s or not. It is true by default. Note that #then is not overriden
This promise implementation has no graph capabilities. The execution must be a pipeline, and a whole pipeline is represented by a single Promise. State predicates such as #fulfilled? or #rejected? are valid for the whole pipeline. There is no way to handle errors for only parts of the pipeline.
Defined Under Namespace
Classes: AlreadyHasErrorHandler, Failure, NotComplete, Null, PipelineElement
Instance Attribute Summary collapse
-
#description ⇒ Object
readonly
A description text for debugging purposes.
-
#error_handling_failure ⇒ Exception?
readonly
The exception raised by the error handlers when during processing.
-
#error_pipeline ⇒ Array<PipelineElement>
readonly
The pipeline that will be executed if an error happens in #pipeline.
-
#execution_engine ⇒ Object
readonly
The execution engine we execute on.
-
#pipeline ⇒ Array<PipelineElement>
readonly
The pipeline itself.
-
#promise ⇒ Object
readonly
The Promise object from concurrent-ruby that handles the nominal part of the execution.
Class Method Summary collapse
Instance Method Summary collapse
-
#add_observer {|time, result, reason| ... } ⇒ Object
Register a block that will be called on this promise’s termination.
-
#before(description: "#{self.description}.before", in_engine: true, &block) ⇒ Object
Queue a block at the beginning of the pipeline.
- #complete? ⇒ Boolean
-
#current_element ⇒ String?
The description element being currently executed.
-
#empty? ⇒ Boolean
Whether this promise does have elements.
- #execute ⇒ Object
- #fail(exception = StandardError) ⇒ Object
- #fulfilled? ⇒ Boolean
-
#handled_error? ⇒ Boolean
Whether the promise’s failure was successfully handled by error handlers.
-
#has_error_handler? ⇒ Boolean
Whether self already has an error handler.
-
#initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) ⇒ Promise
constructor
A new instance of Promise.
-
#null? ⇒ Boolean
Whether this is a null promise.
-
#on_error(description: "#{self.description}.on_error", in_engine: true) {|reason| ... } ⇒ Object
Schedule execution of a block if self or one of its parents failed.
-
#on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) ⇒ Object
Schedule execution of a block on the success of self.
- #pending? ⇒ Boolean
- #pretty_print(pp) ⇒ Object
-
#reason ⇒ Object
Returns the exception that caused the promise to be rejected.
- #rejected? ⇒ Boolean
-
#run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) ⇒ Object
private
Helper method for #run_pipeline_elements, to run a sequence of elements in a pipeline that have the same run_in_engine?.
-
#run_pipeline(*state) ⇒ Object
private
Internal implementation of the pipeline.
-
#run_pipeline_elements(pipeline, state, propagate_state: true) ⇒ Object
private
Run one of #pipeline or #error_pipeline.
-
#state ⇒ Object
The promise’s execution state.
-
#then(description: "#{self.description}.then[#{pipeline.size}]", &block) ⇒ Object
Alias for #on_success, but defaulting to execution as a separate thread.
- #to_s ⇒ Object
- #unscheduled? ⇒ Boolean
- #value(timeout = nil) ⇒ Object
- #value!(timeout = nil) ⇒ Object
- #wait ⇒ Object
Constructor Details
#initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) ⇒ Promise
Returns a new instance of Promise.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/roby/promise.rb', line 37 def initialize( execution_engine, executor: execution_engine.thread_pool, description: "promise", &block ) @execution_engine = execution_engine @description = description @pipeline = [] @error_pipeline = [] @promise = Concurrent::Promise.new(executor: executor, &method(:run_pipeline)) @current_element = Concurrent::AtomicReference.new @error_handling_failure = nil self.then(&block) if block end |
Instance Attribute Details
#description ⇒ Object (readonly)
A description text for debugging purposes
27 28 29 |
# File 'lib/roby/promise.rb', line 27 def description @description end |
#error_handling_failure ⇒ Exception? (readonly)
The exception raised by the error handlers when during processing
216 217 218 |
# File 'lib/roby/promise.rb', line 216 def error_handling_failure @error_handling_failure end |
#error_pipeline ⇒ Array<PipelineElement> (readonly)
The pipeline that will be executed if an error happens in #pipeline
35 36 37 |
# File 'lib/roby/promise.rb', line 35 def error_pipeline @error_pipeline end |
#execution_engine ⇒ Object (readonly)
The execution engine we execute on
22 23 24 |
# File 'lib/roby/promise.rb', line 22 def execution_engine @execution_engine end |
#pipeline ⇒ Array<PipelineElement> (readonly)
The pipeline itself
31 32 33 |
# File 'lib/roby/promise.rb', line 31 def pipeline @pipeline end |
#promise ⇒ Object (readonly)
The Promise object from concurrent-ruby that handles the nominal part of the execution
25 26 27 |
# File 'lib/roby/promise.rb', line 25 def promise @promise end |
Class Method Details
Instance Method Details
#add_observer {|time, result, reason| ... } ⇒ Object
Register a block that will be called on this promise’s termination
323 324 325 |
# File 'lib/roby/promise.rb', line 323 def add_observer(&block) promise.add_observer(&block) end |
#before(description: "#{self.description}.before", in_engine: true, &block) ⇒ Object
Queue a block at the beginning of the pipeline
219 220 221 222 |
# File 'lib/roby/promise.rb', line 219 def before(description: "#{self.description}.before", in_engine: true, &block) pipeline.unshift PipelineElement.new(description, in_engine, block) self end |
#complete? ⇒ Boolean
275 276 277 |
# File 'lib/roby/promise.rb', line 275 def complete? promise.complete? end |
#current_element ⇒ String?
The description element being currently executed
65 66 67 |
# File 'lib/roby/promise.rb', line 65 def current_element @current_element.get end |
#empty? ⇒ Boolean
Whether this promise does have elements
58 59 60 |
# File 'lib/roby/promise.rb', line 58 def empty? @pipeline.empty? end |
#execute ⇒ Object
261 262 263 264 265 |
# File 'lib/roby/promise.rb', line 261 def execute execution_engine.waiting_work << self promise.execute self end |
#fail(exception = StandardError) ⇒ Object
257 258 259 |
# File 'lib/roby/promise.rb', line 257 def fail(exception = StandardError) promise.fail(exception) end |
#fulfilled? ⇒ Boolean
279 280 281 |
# File 'lib/roby/promise.rb', line 279 def fulfilled? promise.fulfilled? end |
#handled_error? ⇒ Boolean
Whether the promise’s failure was successfully handled by error handlers
This makes only sense when #rejected? returns true
This will return true if (1) there are error handlers and (2) executing the handlers did not raise errors
209 210 211 |
# File 'lib/roby/promise.rb', line 209 def handled_error? has_error_handler? && !@error_handling_failure end |
#has_error_handler? ⇒ Boolean
Whether self already has an error handler
Unlike Concurrent::Promise, Roby::Promise objects can only have one error handler
199 200 201 |
# File 'lib/roby/promise.rb', line 199 def has_error_handler? !error_pipeline.empty? end |
#null? ⇒ Boolean
Whether this is a null promise
53 54 55 |
# File 'lib/roby/promise.rb', line 53 def null? false end |
#on_error(description: "#{self.description}.on_error", in_engine: true) {|reason| ... } ⇒ Object
Schedule execution of a block if self or one of its parents failed
246 247 248 249 |
# File 'lib/roby/promise.rb', line 246 def on_error(description: "#{self.description}.on_error", in_engine: true, &block) error_pipeline << PipelineElement.new(description, in_engine, block) self end |
#on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) ⇒ Object
Schedule execution of a block on the success of self
230 231 232 233 234 235 236 |
# File 'lib/roby/promise.rb', line 230 def on_success( description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block ) pipeline << PipelineElement.new(description, in_engine, block) self end |
#pending? ⇒ Boolean
271 272 273 |
# File 'lib/roby/promise.rb', line 271 def pending? promise.pending? end |
#pretty_print(pp) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/roby/promise.rb', line 165 def pretty_print(pp) description = self.description pp.text "Roby::Promise(#{description}" if (current_element = self.current_element) pp.text ", currently: #{current_element})" else pp.text ")" end pipeline.each do |element| pp.nest(2) do pp.text "." pp.breakable if element.run_in_engine pp.text "on_success(#{element.description})" else pp.text "then(#{element.description})" end end end error_pipeline.each do |element| pp.nest(2) do pp.text "." pp.breakable pp.text "on_error(#{element.description}, "\ "in_engine: #{element.run_in_engine})" end end end |
#reason ⇒ Object
Returns the exception that caused the promise to be rejected
306 307 308 309 |
# File 'lib/roby/promise.rb', line 306 def reason failure = promise.reason failure&.actual_exception end |
#rejected? ⇒ Boolean
283 284 285 |
# File 'lib/roby/promise.rb', line 283 def rejected? promise.rejected? end |
#run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Helper method for #run_pipeline_elements, to run a sequence of elements in a pipeline that have the same run_in_engine?
147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/roby/promise.rb', line 147 def run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) while (element = pipeline.first) && !(in_engine ^ element.run_in_engine) pipeline.shift @current_element.set(element.description) new_state = execution_engine.log_timepoint_group( "#{element.description} in_engine=#{element.run_in_engine}" ) do element.callback.call(state) end state = new_state if propagate_state end state end |
#run_pipeline(*state) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Internal implementation of the pipeline. This holds a thread until it is finished - there’s no point in giving the thread back between the steps in the pipeline, given how the promises are used in Roby (to avoid freezing due to blocking calls)
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/roby/promise.rb', line 78 def run_pipeline(*state) Thread.current.name = "run_promises" execution_engine.log_timepoint_group description.to_s do begin run_pipeline_elements(pipeline, state) rescue ::Exception => error # rubocop:disable Naming/RescuedExceptionsVariableName begin run_pipeline_elements(error_pipeline, error, propagate_state: false) rescue ::Exception => error_handling_failure # rubocop:disable Naming/RescuedExceptionsVariableName @error_handling_failure = error_handling_failure end raise Failure.new(error) end end ensure @current_element.set(nil) end |
#run_pipeline_elements(pipeline, state, propagate_state: true) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Run one of #pipeline or #error_pipeline
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/roby/promise.rb', line 101 def run_pipeline_elements(pipeline, state, propagate_state: true) pipeline = pipeline.dup until pipeline.empty? state = run_one_pipeline_segment( pipeline, state, false, propagate_state: propagate_state ) break if pipeline.empty? state = execution_engine.log_timepoint_group( "#{description}:execute_in_engine" ) do execution_engine.execute(type: :propagation) do run_one_pipeline_segment( pipeline, state, true, propagate_state: propagate_state ) end end end state end |
#state ⇒ Object
The promise’s execution state
312 313 314 |
# File 'lib/roby/promise.rb', line 312 def state promise.state end |
#then(description: "#{self.description}.then[#{pipeline.size}]", &block) ⇒ Object
Alias for #on_success, but defaulting to execution as a separate thread
253 254 255 |
# File 'lib/roby/promise.rb', line 253 def then(description: "#{self.description}.then[#{pipeline.size}]", &block) on_success(description: description, in_engine: false, &block) end |
#to_s ⇒ Object
161 162 163 |
# File 'lib/roby/promise.rb', line 161 def to_s "#<Roby::Promise #{description}>" end |
#unscheduled? ⇒ Boolean
267 268 269 |
# File 'lib/roby/promise.rb', line 267 def unscheduled? promise.unscheduled? end |
#value(timeout = nil) ⇒ Object
291 292 293 294 295 |
# File 'lib/roby/promise.rb', line 291 def value(timeout = nil) return promise.value(timeout) if promise.complete? raise NotComplete, "cannot call #value on a non-complete promise" end |
#value!(timeout = nil) ⇒ Object
297 298 299 300 301 302 303 |
# File 'lib/roby/promise.rb', line 297 def value!(timeout = nil) return promise.value!(timeout) if promise.complete? raise NotComplete, "cannot call #value on a non-complete promise" rescue Failure => e raise e.actual_exception end |
#wait ⇒ Object
287 288 289 |
# File 'lib/roby/promise.rb', line 287 def wait promise.wait end |