Class: Roby::Promise

Inherits:
Object show all
Defined in:
lib/roby/promise.rb

Overview

An extension to Concurrent::Promise that is aware of the mixed thread/event loop nature of Roby

Use ExecutionEngine#promise to create one

#on_success and #rescue gain an in_engine argument, which decides whether the given block should be executed by the underlying execution engine’s or not. It is true by default. Note that #then is not overriden

This promise implementation has no graph capabilities. The execution must be a pipeline, and a whole pipeline is represented by a single Promise. State predicates such as #fulfilled? or #rejected? are valid for the whole pipeline. There is no way to handle errors for only parts of the pipeline.

Defined Under Namespace

Classes: AlreadyHasErrorHandler, Failure, NotComplete, Null, PipelineElement

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) ⇒ Promise

Returns a new instance of Promise.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/roby/promise.rb', line 37

def initialize(
    execution_engine,
    executor: execution_engine.thread_pool, description: "promise", &block
)
    @execution_engine = execution_engine
    @description = description

    @pipeline = []
    @error_pipeline = []
    @promise = Concurrent::Promise.new(executor: executor, &method(:run_pipeline))
    @current_element = Concurrent::AtomicReference.new
    @error_handling_failure = nil
    self.then(&block) if block
end

Instance Attribute Details

#descriptionObject (readonly)

A description text for debugging purposes



27
28
29
# File 'lib/roby/promise.rb', line 27

def description
  @description
end

#error_handling_failureException? (readonly)

The exception raised by the error handlers when during processing

Returns:



216
217
218
# File 'lib/roby/promise.rb', line 216

def error_handling_failure
  @error_handling_failure
end

#error_pipelineArray<PipelineElement> (readonly)

The pipeline that will be executed if an error happens in #pipeline

Returns:



35
36
37
# File 'lib/roby/promise.rb', line 35

def error_pipeline
  @error_pipeline
end

#execution_engineObject (readonly)

The execution engine we execute on



22
23
24
# File 'lib/roby/promise.rb', line 22

def execution_engine
  @execution_engine
end

#pipelineArray<PipelineElement> (readonly)

The pipeline itself

Returns:



31
32
33
# File 'lib/roby/promise.rb', line 31

def pipeline
  @pipeline
end

#promiseObject (readonly)

The Promise object from concurrent-ruby that handles the nominal part of the execution



25
26
27
# File 'lib/roby/promise.rb', line 25

def promise
  @promise
end

Class Method Details

.null(value = nil) ⇒ Object



327
328
329
# File 'lib/roby/promise.rb', line 327

def self.null(value = nil)
    Null.new(value)
end

Instance Method Details

#add_observer {|time, result, reason| ... } ⇒ Object

Register a block that will be called on this promise’s termination

Yield Parameters:

  • time (Time)

    the termination time

  • result (Object, nil)

    the promise result if it finished execution successfully, or nil if an exception was raised

  • reason (Object, nil)

    the exception that terminated this promise if it failed, or nil if it finished successfully



323
324
325
# File 'lib/roby/promise.rb', line 323

def add_observer(&block)
    promise.add_observer(&block)
end

#before(description: "#{self.description}.before", in_engine: true, &block) ⇒ Object

Queue a block at the beginning of the pipeline



219
220
221
222
# File 'lib/roby/promise.rb', line 219

def before(description: "#{self.description}.before", in_engine: true, &block)
    pipeline.unshift PipelineElement.new(description, in_engine, block)
    self
end

#complete?Boolean

Returns:

  • (Boolean)


275
276
277
# File 'lib/roby/promise.rb', line 275

def complete?
    promise.complete?
end

#current_elementString?

The description element being currently executed

Returns:

  • (String, nil)


65
66
67
# File 'lib/roby/promise.rb', line 65

def current_element
    @current_element.get
end

#empty?Boolean

Whether this promise does have elements

Returns:

  • (Boolean)


58
59
60
# File 'lib/roby/promise.rb', line 58

def empty?
    @pipeline.empty?
end

#executeObject



261
262
263
264
265
# File 'lib/roby/promise.rb', line 261

def execute
    execution_engine.waiting_work << self
    promise.execute
    self
end

#fail(exception = StandardError) ⇒ Object



257
258
259
# File 'lib/roby/promise.rb', line 257

def fail(exception = StandardError)
    promise.fail(exception)
end

#fulfilled?Boolean

Returns:

  • (Boolean)


279
280
281
# File 'lib/roby/promise.rb', line 279

def fulfilled?
    promise.fulfilled?
end

#handled_error?Boolean

Whether the promise’s failure was successfully handled by error handlers

This makes only sense when #rejected? returns true

This will return true if (1) there are error handlers and (2) executing the handlers did not raise errors

Returns:

  • (Boolean)


209
210
211
# File 'lib/roby/promise.rb', line 209

def handled_error?
    has_error_handler? && !@error_handling_failure
end

#has_error_handler?Boolean

Whether self already has an error handler

Unlike Concurrent::Promise, Roby::Promise objects can only have one error handler

Returns:

  • (Boolean)


199
200
201
# File 'lib/roby/promise.rb', line 199

def has_error_handler?
    !error_pipeline.empty?
end

#null?Boolean

Whether this is a null promise

Returns:

  • (Boolean)


53
54
55
# File 'lib/roby/promise.rb', line 53

def null?
    false
end

#on_error(description: "#{self.description}.on_error", in_engine: true) {|reason| ... } ⇒ Object

Schedule execution of a block if self or one of its parents failed

Parameters:

  • description (String) (defaults to: "#{self.description}.on_error")

    a textual description useful for debugging

  • in_engine (Boolean) (defaults to: true)

    whether the block should be executed within the underlying ExecutionEngine, a.k.a. in the main thread, or scheduled in a separate thread.

Yield Parameters:

  • reason (Object)

    the exception that caused the failure, usually an exception that was raised by one of the promise blocks.



246
247
248
249
# File 'lib/roby/promise.rb', line 246

def on_error(description: "#{self.description}.on_error", in_engine: true, &block)
    error_pipeline << PipelineElement.new(description, in_engine, block)
    self
end

#on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) ⇒ Object

Schedule execution of a block on the success of self

Parameters:

  • description (String) (defaults to: "#{self.description}.on_success[#{pipeline.size}]")

    a textual description useful for debugging

  • in_engine (Boolean) (defaults to: true)

    whether the block should be executed within the underlying ExecutionEngine, a.k.a. in the main thread, or scheduled in a separate thread.



230
231
232
233
234
235
236
# File 'lib/roby/promise.rb', line 230

def on_success(
    description: "#{self.description}.on_success[#{pipeline.size}]",
    in_engine: true, &block
)
    pipeline << PipelineElement.new(description, in_engine, block)
    self
end

#pending?Boolean

Returns:

  • (Boolean)


271
272
273
# File 'lib/roby/promise.rb', line 271

def pending?
    promise.pending?
end

#pretty_print(pp) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/roby/promise.rb', line 165

def pretty_print(pp)
    description = self.description
    pp.text "Roby::Promise(#{description}"
    if (current_element = self.current_element)
        pp.text ", currently: #{current_element})"
    else
        pp.text ")"
    end

    pipeline.each do |element|
        pp.nest(2) do
            pp.text "."
            pp.breakable
            if element.run_in_engine
                pp.text "on_success(#{element.description})"
            else
                pp.text "then(#{element.description})"
            end
        end
    end
    error_pipeline.each do |element|
        pp.nest(2) do
            pp.text "."
            pp.breakable
            pp.text "on_error(#{element.description}, "\
                    "in_engine: #{element.run_in_engine})"
        end
    end
end

#reasonObject

Returns the exception that caused the promise to be rejected



306
307
308
309
# File 'lib/roby/promise.rb', line 306

def reason
    failure = promise.reason
    failure&.actual_exception
end

#rejected?Boolean

Returns:

  • (Boolean)


283
284
285
# File 'lib/roby/promise.rb', line 283

def rejected?
    promise.rejected?
end

#run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Helper method for #run_pipeline_elements, to run a sequence of elements in a pipeline that have the same run_in_engine?



147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/roby/promise.rb', line 147

def run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true)
    while (element = pipeline.first) && !(in_engine ^ element.run_in_engine)
        pipeline.shift
        @current_element.set(element.description)
        new_state = execution_engine.log_timepoint_group(
            "#{element.description} in_engine=#{element.run_in_engine}"
        ) do
            element.callback.call(state)
        end
        state = new_state if propagate_state
    end
    state
end

#run_pipeline(*state) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal implementation of the pipeline. This holds a thread until it is finished - there’s no point in giving the thread back between the steps in the pipeline, given how the promises are used in Roby (to avoid freezing due to blocking calls)



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/roby/promise.rb', line 78

def run_pipeline(*state)
    Thread.current.name = "run_promises"

    execution_engine.log_timepoint_group description.to_s do
        begin
            run_pipeline_elements(pipeline, state)
        rescue ::Exception => error # rubocop:disable Naming/RescuedExceptionsVariableName
            begin
                run_pipeline_elements(error_pipeline, error,
                                      propagate_state: false)
            rescue ::Exception => error_handling_failure # rubocop:disable Naming/RescuedExceptionsVariableName
                @error_handling_failure = error_handling_failure
            end
            raise Failure.new(error)
        end
    end
ensure
    @current_element.set(nil)
end

#run_pipeline_elements(pipeline, state, propagate_state: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Run one of #pipeline or #error_pipeline



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/roby/promise.rb', line 101

def run_pipeline_elements(pipeline, state, propagate_state: true)
    pipeline = pipeline.dup
    until pipeline.empty?
        state = run_one_pipeline_segment(
            pipeline, state, false, propagate_state: propagate_state
        )
        break if pipeline.empty?

        state = execution_engine.log_timepoint_group(
            "#{description}:execute_in_engine"
        ) do
            execution_engine.execute(type: :propagation) do
                run_one_pipeline_segment(
                    pipeline, state, true, propagate_state: propagate_state
                )
            end
        end
    end
    state
end

#stateObject

The promise’s execution state



312
313
314
# File 'lib/roby/promise.rb', line 312

def state
    promise.state
end

#then(description: "#{self.description}.then[#{pipeline.size}]", &block) ⇒ Object

Alias for #on_success, but defaulting to execution as a separate thread



253
254
255
# File 'lib/roby/promise.rb', line 253

def then(description: "#{self.description}.then[#{pipeline.size}]", &block)
    on_success(description: description, in_engine: false, &block)
end

#to_sObject



161
162
163
# File 'lib/roby/promise.rb', line 161

def to_s
    "#<Roby::Promise #{description}>"
end

#unscheduled?Boolean

Returns:

  • (Boolean)


267
268
269
# File 'lib/roby/promise.rb', line 267

def unscheduled?
    promise.unscheduled?
end

#value(timeout = nil) ⇒ Object

Raises:



291
292
293
294
295
# File 'lib/roby/promise.rb', line 291

def value(timeout = nil)
    return promise.value(timeout) if promise.complete?

    raise NotComplete, "cannot call #value on a non-complete promise"
end

#value!(timeout = nil) ⇒ Object



297
298
299
300
301
302
303
# File 'lib/roby/promise.rb', line 297

def value!(timeout = nil)
    return promise.value!(timeout) if promise.complete?

    raise NotComplete, "cannot call #value on a non-complete promise"
rescue Failure => e
    raise e.actual_exception
end

#waitObject



287
288
289
# File 'lib/roby/promise.rb', line 287

def wait
    promise.wait
end