Class: Roby::Promise
Overview
An extension to Concurrent::Promise that is aware of the mixed thread/event loop nature of Roby
Use ExecutionEngine#promise to create one
#on_success and #rescue gain an in_engine argument, which decides whether the given block should be executed by the underlying execution engine’s or not. It is true by default. Note that #then is not overriden
This promise implementation has no graph capabilities. The execution must be a pipeline, and a whole pipeline is represented by a single Promise. State predicates such as #fulfilled? or #rejected? are valid for the whole pipeline. There is no way to handle errors for only parts of the pipeline.
Defined Under Namespace
Classes: AlreadyHasErrorHandler, Failure, NotComplete, Null, PipelineElement
Instance Attribute Summary collapse
-
#description ⇒ Object
readonly
A description text for debugging purposes.
-
#error_pipeline ⇒ Array<PipelineElement>
readonly
The pipeline that will be executed if an error happens in #pipeline.
-
#execution_engine ⇒ Object
readonly
The execution engine we execute on.
-
#pipeline ⇒ Array<PipelineElement>
readonly
The pipeline itself.
-
#promise ⇒ Object
readonly
The Promise object from concurrent-ruby that handles the nominal part of the execution.
Class Method Summary collapse
Instance Method Summary collapse
-
#add_observer {|time, result, reason| ... } ⇒ Object
Register a block that will be called on this promise’s termination.
-
#before(description: "#{self.description}.before", in_engine: true, &block) ⇒ Object
Queue a block at the beginning of the pipeline.
- #complete? ⇒ Boolean
-
#current_element ⇒ String?
The description element being currently executed.
-
#empty? ⇒ Boolean
Whether this promise does have elements.
- #execute ⇒ Object
- #fail(exception = StandardError) ⇒ Object
- #fulfilled? ⇒ Boolean
-
#has_error_handler? ⇒ Boolean
Whether self already has an error handler.
-
#initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) ⇒ Promise
constructor
A new instance of Promise.
-
#null? ⇒ Boolean
Whether this is a null promise.
-
#on_error(description: "#{self.description}.on_error", in_engine: true) {|reason| ... } ⇒ Object
Schedule execution of a block if self or one of its parents failed.
-
#on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) ⇒ Object
Schedule execution of a block on the success of self.
- #pending? ⇒ Boolean
- #pretty_print(pp) ⇒ Object
-
#reason ⇒ Object
Returns the exception that caused the promise to be rejected.
- #rejected? ⇒ Boolean
-
#run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) ⇒ Object
private
Helper method for #run_pipeline_elements, to run a sequence of elements in a pipeline that have the same run_in_engine?.
-
#run_pipeline(*state) ⇒ Object
private
Internal implementation of the pipeline.
-
#run_pipeline_elements(pipeline, state, propagate_state: true) ⇒ Object
private
Run one of #pipeline or #error_pipeline.
-
#state ⇒ Object
The promise’s execution state.
-
#then(description: "#{self.description}.then[#{pipeline.size}]", &block) ⇒ Object
Alias for #on_success, but defaulting to execution as a separate thread.
- #to_s ⇒ Object
- #unscheduled? ⇒ Boolean
- #value(timeout = nil) ⇒ Object
- #value!(timeout = nil) ⇒ Object
- #wait ⇒ Object
Constructor Details
#initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) ⇒ Promise
35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/roby/promise.rb', line 35 def initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) @execution_engine = execution_engine execution_engine.waiting_work << self @description = description @pipeline = Array.new @error_pipeline = Array.new @promise = Concurrent::Promise.new(executor: executor, &method(:run_pipeline)) @current_element = Concurrent::AtomicReference.new if block self.then(&block) end end |
Instance Attribute Details
#description ⇒ Object (readonly)
A description text for debugging purposes
25 26 27 |
# File 'lib/roby/promise.rb', line 25 def description @description end |
#error_pipeline ⇒ Array<PipelineElement> (readonly)
The pipeline that will be executed if an error happens in #pipeline
33 34 35 |
# File 'lib/roby/promise.rb', line 33 def error_pipeline @error_pipeline end |
#execution_engine ⇒ Object (readonly)
The execution engine we execute on
20 21 22 |
# File 'lib/roby/promise.rb', line 20 def execution_engine @execution_engine end |
#pipeline ⇒ Array<PipelineElement> (readonly)
The pipeline itself
29 30 31 |
# File 'lib/roby/promise.rb', line 29 def pipeline @pipeline end |
#promise ⇒ Object (readonly)
The Promise object from concurrent-ruby that handles the nominal part of the execution
23 24 25 |
# File 'lib/roby/promise.rb', line 23 def promise @promise end |
Class Method Details
.null(value = nil) ⇒ Object
293 294 295 |
# File 'lib/roby/promise.rb', line 293 def self.null(value = nil) Null.new(value) end |
Instance Method Details
#add_observer {|time, result, reason| ... } ⇒ Object
Register a block that will be called on this promise’s termination
289 290 291 |
# File 'lib/roby/promise.rb', line 289 def add_observer(&block) promise.add_observer(&block) end |
#before(description: "#{self.description}.before", in_engine: true, &block) ⇒ Object
Queue a block at the beginning of the pipeline
184 185 186 187 |
# File 'lib/roby/promise.rb', line 184 def before(description: "#{self.description}.before", in_engine: true, &block) pipeline.unshift PipelineElement.new(description, in_engine, block) self end |
#complete? ⇒ Boolean
236 237 238 |
# File 'lib/roby/promise.rb', line 236 def complete? promise.complete? end |
#current_element ⇒ String?
The description element being currently executed
62 63 64 |
# File 'lib/roby/promise.rb', line 62 def current_element @current_element.get end |
#empty? ⇒ Boolean
Whether this promise does have elements
55 56 57 |
# File 'lib/roby/promise.rb', line 55 def empty? @pipeline.empty? end |
#execute ⇒ Object
223 224 225 226 |
# File 'lib/roby/promise.rb', line 223 def execute promise.execute self end |
#fail(exception = StandardError) ⇒ Object
219 220 221 |
# File 'lib/roby/promise.rb', line 219 def fail(exception = StandardError) promise.fail(exception) end |
#fulfilled? ⇒ Boolean
240 241 242 |
# File 'lib/roby/promise.rb', line 240 def fulfilled? promise.fulfilled? end |
#has_error_handler? ⇒ Boolean
Whether self already has an error handler
Unlike Concurrent::Promise, Roby::Promise objects can only have one error handler
179 180 181 |
# File 'lib/roby/promise.rb', line 179 def has_error_handler? !error_pipeline.empty? end |
#null? ⇒ Boolean
Whether this is a null promise
50 51 52 |
# File 'lib/roby/promise.rb', line 50 def null? false end |
#on_error(description: "#{self.description}.on_error", in_engine: true) {|reason| ... } ⇒ Object
Schedule execution of a block if self or one of its parents failed
208 209 210 211 |
# File 'lib/roby/promise.rb', line 208 def on_error(description: "#{self.description}.on_error", in_engine: true, &block) error_pipeline << PipelineElement.new(description, in_engine, block) self end |
#on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) ⇒ Object
Schedule execution of a block on the success of self
195 196 197 198 |
# File 'lib/roby/promise.rb', line 195 def on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) pipeline << PipelineElement.new(description, in_engine, block) self end |
#pending? ⇒ Boolean
232 233 234 |
# File 'lib/roby/promise.rb', line 232 def pending? promise.pending? end |
#pretty_print(pp) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/roby/promise.rb', line 146 def pretty_print(pp) description = self.description pp.text "Roby::Promise(#{description}" if current_element = self.current_element pp.text ", currently: #{current_element})" else pp.text ")" end pipeline.each do |element| pp.nest(2) do pp.text "." pp.breakable if element.run_in_engine pp.text "on_success(#{element.description})" else pp.text "then(#{element.description})" end end end error_pipeline.each do |element| pp.nest(2) do pp.text "." pp.breakable pp.text "on_error(#{element.description}, in_engine: #{element.run_in_engine})" end end end |
#reason ⇒ Object
Returns the exception that caused the promise to be rejected
271 272 273 274 275 |
# File 'lib/roby/promise.rb', line 271 def reason if failure = promise.reason failure.actual_exception end end |
#rejected? ⇒ Boolean
244 245 246 |
# File 'lib/roby/promise.rb', line 244 def rejected? promise.rejected? end |
#run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Helper method for #run_pipeline_elements, to run a sequence of elements in a pipeline that have the same run_in_engine?
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/roby/promise.rb', line 130 def run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) while (element = pipeline.first) && !(in_engine ^ element.run_in_engine) pipeline.shift @current_element.set(element.description) new_state = execution_engine.log_timepoint_group "#{element.description} in_engine=#{element.run_in_engine}" do element.callback.call(state) end state = new_state if propagate_state end state end |
#run_pipeline(*state) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Internal implementation of the pipeline. This holds a thread until it is finished - there’s no point in giving the thread back between the steps in the pipeline, given how the promises are used in Roby (to avoid freezing due to blocking calls)
75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/roby/promise.rb', line 75 def run_pipeline(*state) Thread.current.name = "run_promises" execution_engine.log_timepoint_group "#{description}" do begin run_pipeline_elements(self.pipeline, state) rescue Exception => exception run_pipeline_elements(self.error_pipeline, exception, propagate_state: false) raise Failure.new(exception) end end ensure @current_element.set(nil) end |
#run_pipeline_elements(pipeline, state, propagate_state: true) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Run one of #pipeline or #error_pipeline
93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/roby/promise.rb', line 93 def run_pipeline_elements(pipeline, state, propagate_state: true) pipeline = pipeline.dup while !pipeline.empty? state = run_one_pipeline_segment(pipeline, state, false, propagate_state: propagate_state) if !pipeline.empty? state = execution_engine.log_timepoint_group "#{description}:execute_in_engine" do execution_engine.execute(type: :propagation) do run_one_pipeline_segment(pipeline, state, true, propagate_state: propagate_state) end end end end state end |
#state ⇒ Object
The promise’s execution state
278 279 280 |
# File 'lib/roby/promise.rb', line 278 def state promise.state end |
#then(description: "#{self.description}.then[#{pipeline.size}]", &block) ⇒ Object
Alias for #on_success, but defaulting to execution as a separate thread
215 216 217 |
# File 'lib/roby/promise.rb', line 215 def then(description: "#{self.description}.then[#{pipeline.size}]", &block) on_success(description: description, in_engine: false, &block) end |
#to_s ⇒ Object
142 143 144 |
# File 'lib/roby/promise.rb', line 142 def to_s "#<Roby::Promise #{description}>" end |
#unscheduled? ⇒ Boolean
228 229 230 |
# File 'lib/roby/promise.rb', line 228 def unscheduled? promise.unscheduled? end |
#value(timeout = nil) ⇒ Object
252 253 254 255 256 257 258 |
# File 'lib/roby/promise.rb', line 252 def value(timeout = nil) if promise.complete? promise.value(timeout) else raise NotComplete, "cannot call #value on a non-complete promise" end end |
#value!(timeout = nil) ⇒ Object
260 261 262 263 264 265 266 267 268 |
# File 'lib/roby/promise.rb', line 260 def value!(timeout = nil) if promise.complete? promise.value!(timeout) else raise NotComplete, "cannot call #value on a non-complete promise" end rescue Failure => e raise e.actual_exception end |
#wait ⇒ Object
248 249 250 |
# File 'lib/roby/promise.rb', line 248 def wait promise.wait end |