Class: Roby::Promise

Inherits:
Object show all
Defined in:
lib/roby/promise.rb

Overview

An extension to Concurrent::Promise that is aware of the mixed thread/event loop nature of Roby

Use ExecutionEngine#promise to create one

#on_success and #rescue gain an in_engine argument, which decides whether the given block should be executed by the underlying execution engine’s or not. It is true by default. Note that #then is not overriden

This promise implementation has no graph capabilities. The execution must be a pipeline, and a whole pipeline is represented by a single Promise. State predicates such as #fulfilled? or #rejected? are valid for the whole pipeline. There is no way to handle errors for only parts of the pipeline.

Defined Under Namespace

Classes: AlreadyHasErrorHandler, Failure, NotComplete, Null, PipelineElement

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) ⇒ Promise



35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/roby/promise.rb', line 35

def initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block)
    @execution_engine = execution_engine
    execution_engine.waiting_work << self
    @description = description

    @pipeline = Array.new
    @error_pipeline = Array.new
    @promise = Concurrent::Promise.new(executor: executor, &method(:run_pipeline))
    @current_element = Concurrent::AtomicReference.new
    if block
        self.then(&block)
    end
end

Instance Attribute Details

#descriptionObject (readonly)

A description text for debugging purposes



25
26
27
# File 'lib/roby/promise.rb', line 25

def description
  @description
end

#error_pipelineArray<PipelineElement> (readonly)

The pipeline that will be executed if an error happens in #pipeline



33
34
35
# File 'lib/roby/promise.rb', line 33

def error_pipeline
  @error_pipeline
end

#execution_engineObject (readonly)

The execution engine we execute on



20
21
22
# File 'lib/roby/promise.rb', line 20

def execution_engine
  @execution_engine
end

#pipelineArray<PipelineElement> (readonly)

The pipeline itself



29
30
31
# File 'lib/roby/promise.rb', line 29

def pipeline
  @pipeline
end

#promiseObject (readonly)

The Promise object from concurrent-ruby that handles the nominal part of the execution



23
24
25
# File 'lib/roby/promise.rb', line 23

def promise
  @promise
end

Class Method Details

.null(value = nil) ⇒ Object



293
294
295
# File 'lib/roby/promise.rb', line 293

def self.null(value = nil)
    Null.new(value)
end

Instance Method Details

#add_observer {|time, result, reason| ... } ⇒ Object

Register a block that will be called on this promise’s termination

Yield Parameters:

  • time (Time)

    the termination time

  • result (Object, nil)

    the promise result if it finished execution successfully, or nil if an exception was raised

  • reason (Object, nil)

    the exception that terminated this promise if it failed, or nil if it finished successfully



289
290
291
# File 'lib/roby/promise.rb', line 289

def add_observer(&block)
    promise.add_observer(&block)
end

#before(description: "#{self.description}.before", in_engine: true, &block) ⇒ Object

Queue a block at the beginning of the pipeline



184
185
186
187
# File 'lib/roby/promise.rb', line 184

def before(description: "#{self.description}.before", in_engine: true, &block)
    pipeline.unshift PipelineElement.new(description, in_engine, block)
    self
end

#complete?Boolean



236
237
238
# File 'lib/roby/promise.rb', line 236

def complete?
    promise.complete?
end

#current_elementString?

The description element being currently executed



62
63
64
# File 'lib/roby/promise.rb', line 62

def current_element
    @current_element.get
end

#empty?Boolean

Whether this promise does have elements



55
56
57
# File 'lib/roby/promise.rb', line 55

def empty?
    @pipeline.empty?
end

#executeObject



223
224
225
226
# File 'lib/roby/promise.rb', line 223

def execute
    promise.execute
    self
end

#fail(exception = StandardError) ⇒ Object



219
220
221
# File 'lib/roby/promise.rb', line 219

def fail(exception = StandardError)
    promise.fail(exception)
end

#fulfilled?Boolean



240
241
242
# File 'lib/roby/promise.rb', line 240

def fulfilled?
    promise.fulfilled?
end

#has_error_handler?Boolean

Whether self already has an error handler

Unlike Concurrent::Promise, Roby::Promise objects can only have one error handler



179
180
181
# File 'lib/roby/promise.rb', line 179

def has_error_handler?
    !error_pipeline.empty?
end

#null?Boolean

Whether this is a null promise



50
51
52
# File 'lib/roby/promise.rb', line 50

def null?
    false
end

#on_error(description: "#{self.description}.on_error", in_engine: true) {|reason| ... } ⇒ Object

Schedule execution of a block if self or one of its parents failed

Yield Parameters:

  • reason (Object)

    the exception that caused the failure, usually an exception that was raised by one of the promise blocks.



208
209
210
211
# File 'lib/roby/promise.rb', line 208

def on_error(description: "#{self.description}.on_error", in_engine: true, &block)
    error_pipeline << PipelineElement.new(description, in_engine, block)
    self
end

#on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) ⇒ Object

Schedule execution of a block on the success of self



195
196
197
198
# File 'lib/roby/promise.rb', line 195

def on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block)
    pipeline << PipelineElement.new(description, in_engine, block)
    self
end

#pending?Boolean



232
233
234
# File 'lib/roby/promise.rb', line 232

def pending?
    promise.pending?
end

#pretty_print(pp) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/roby/promise.rb', line 146

def pretty_print(pp)
    description = self.description
    pp.text "Roby::Promise(#{description}"
    if current_element = self.current_element
        pp.text ", currently: #{current_element})"
    else
        pp.text ")"
    end

    pipeline.each do |element|
        pp.nest(2) do
            pp.text "."
            pp.breakable
            if element.run_in_engine
                pp.text "on_success(#{element.description})"
            else
                pp.text "then(#{element.description})"
            end
        end
    end
    error_pipeline.each do |element|
        pp.nest(2) do
            pp.text "."
            pp.breakable
            pp.text "on_error(#{element.description}, in_engine: #{element.run_in_engine})"
        end
    end
end

#reasonObject

Returns the exception that caused the promise to be rejected



271
272
273
274
275
# File 'lib/roby/promise.rb', line 271

def reason
    if failure = promise.reason
        failure.actual_exception
    end
end

#rejected?Boolean



244
245
246
# File 'lib/roby/promise.rb', line 244

def rejected?
    promise.rejected?
end

#run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Helper method for #run_pipeline_elements, to run a sequence of elements in a pipeline that have the same run_in_engine?



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/roby/promise.rb', line 130

def run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true)
    while (element = pipeline.first) && !(in_engine ^ element.run_in_engine)
        pipeline.shift
        @current_element.set(element.description)
        new_state = execution_engine.log_timepoint_group "#{element.description} in_engine=#{element.run_in_engine}" do
            element.callback.call(state)
        end
        state = new_state if propagate_state
    end
    state
end

#run_pipeline(*state) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal implementation of the pipeline. This holds a thread until it is finished - there’s no point in giving the thread back between the steps in the pipeline, given how the promises are used in Roby (to avoid freezing due to blocking calls)



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/roby/promise.rb', line 75

def run_pipeline(*state)
    Thread.current.name = "run_promises"

    execution_engine.log_timepoint_group "#{description}" do
        begin
            run_pipeline_elements(self.pipeline, state)
        rescue Exception => exception
            run_pipeline_elements(self.error_pipeline, exception, propagate_state: false)
            raise Failure.new(exception)
        end
    end
ensure
    @current_element.set(nil)
end

#run_pipeline_elements(pipeline, state, propagate_state: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Run one of #pipeline or #error_pipeline



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/roby/promise.rb', line 93

def run_pipeline_elements(pipeline, state, propagate_state: true)
    pipeline = pipeline.dup
    while !pipeline.empty?
        state = run_one_pipeline_segment(pipeline, state, false, propagate_state: propagate_state)
        if !pipeline.empty?
            state = execution_engine.log_timepoint_group "#{description}:execute_in_engine" do
                execution_engine.execute(type: :propagation) do
                    run_one_pipeline_segment(pipeline, state, true, propagate_state: propagate_state)
                end
            end
        end
    end
    state
end

#stateObject

The promise’s execution state



278
279
280
# File 'lib/roby/promise.rb', line 278

def state
    promise.state
end

#then(description: "#{self.description}.then[#{pipeline.size}]", &block) ⇒ Object

Alias for #on_success, but defaulting to execution as a separate thread



215
216
217
# File 'lib/roby/promise.rb', line 215

def then(description: "#{self.description}.then[#{pipeline.size}]", &block)
    on_success(description: description, in_engine: false, &block)
end

#to_sObject



142
143
144
# File 'lib/roby/promise.rb', line 142

def to_s
    "#<Roby::Promise #{description}>"
end

#unscheduled?Boolean



228
229
230
# File 'lib/roby/promise.rb', line 228

def unscheduled?
    promise.unscheduled?
end

#value(timeout = nil) ⇒ Object



252
253
254
255
256
257
258
# File 'lib/roby/promise.rb', line 252

def value(timeout = nil)
    if promise.complete?
        promise.value(timeout)
    else
        raise NotComplete, "cannot call #value on a non-complete promise"
    end
end

#value!(timeout = nil) ⇒ Object



260
261
262
263
264
265
266
267
268
# File 'lib/roby/promise.rb', line 260

def value!(timeout = nil)
    if promise.complete?
        promise.value!(timeout)
    else
        raise NotComplete, "cannot call #value on a non-complete promise"
    end
rescue Failure => e
    raise e.actual_exception
end

#waitObject



248
249
250
# File 'lib/roby/promise.rb', line 248

def wait
    promise.wait
end