Class: Roby::ExecutionEngine Private

Inherits:
Object
  • Object
show all
Extended by:
Logger::Hierarchy, PropagationHandlerMethods
Includes:
Logger::Hierarchy, DRoby::EventLogging, PropagationHandlerMethods
Defined in:
lib/roby/execution_engine.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The core execution algorithm

It is in charge of handling event and exception propagation, as well as running cleanup processes (e.g. garbage collection).

The main method is #process_events. When executing a Roby application, it is called periodically by #event_loop.

Defined Under Namespace

Modules: PropagationHandlerMethods Classes: AlreadyRunning, EventLoopExitState, ExceptionPropagationVisitor, JoinAllWaitingWorkTimeout, NotPropagationContext, PollBlockDefinition, PropagationInfo, RecursivePropagationContext

Constant Summary collapse

PENDING_PROPAGATION_FORWARD =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

1
PENDING_PROPAGATION_SIGNAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

2
SLEEP_MIN_TIME =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Do not sleep or call Thread#pass if there is less that this much time left in the cycle

0.01
INTERRUPT_FORCE_EXIT_DEAD_ZONE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

How many seconds between two Interrupt before the execution engine’s loop can forcefully quit

10
EXCEPTION_NONFATAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for non-fatal, unhandled exceptions

:nonfatal
EXCEPTION_FATAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for fatal, unhandled exceptions

:fatal
EXCEPTION_HANDLED =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for handled exceptions

:handled
EXCEPTION_FREE_EVENT =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for free event exceptions

:free_event

Instance Attribute Summary collapse

Attributes included from PropagationHandlerMethods

#external_events_handlers, #propagation_handlers, #side_work_handlers

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagationHandlerMethods

add_propagation_handler, add_side_work_handler, at_cycle_begin, create_propagation_handler, each_cycle, remove_propagation_handler, remove_side_work_handler

Methods included from DRoby::EventLogging

#log, #log_flush_cycle, #log_queue_size, #log_timepoint, #log_timepoint_group, #log_timepoint_group_end, #log_timepoint_group_start

Constructor Details

#initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) ⇒ ExecutionEngine

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create an execution engine acting on plan, using control as the decision control object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/roby/execution_engine.rb', line 79

def initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger)
    @plan = plan
    @event_logger = event_logger

    @use_oob_gc = ExecutionEngine.use_oob_gc?

    @control = control
    @scheduler = Schedulers::Null.new(plan)
    reset_thread_pool
    @thread = Thread.current

    @propagation = nil
    @propagation_id = 0
    @propagation_exceptions = nil
    @application_exceptions = nil
    @delayed_events = []
    @event_ordering = []
    @event_priorities = {}
    @propagation_handlers = []
    @external_events_handlers = []
    @side_work_handlers = []
    @at_cycle_end_handlers = []
    @process_every = []
    @waiting_work = Concurrent::Array.new
    @emitted_events = []
    @exception_listeners = []

    @worker_threads_mtx = Mutex.new
    @worker_threads = []
    @once_blocks = Queue.new

    @pending_exceptions = {}

    each_cycle(&ExecutionEngine.method(:call_every))

    @quit = 0
    @allow_propagation = true
    @cycle_index = 0
    @cycle_start = Time.now
    @cycle_length = 0.1
    @last_stop_count = 0
    @finalizers = []
    @gc_warning = true

    refresh_relations

    @exception_display_handler = Roby.null_disposable
    self.display_exceptions = true
end

Instance Attribute Details

#additional_errorsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Used during exception propagation to inject new errors in the process

It shall not be accessed directly. Instead, Plan#add_error should be called



1471
1472
1473
# File 'lib/roby/execution_engine.rb', line 1471

def additional_errors
  @additional_errors
end

#application_exceptionsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of errors which have been generated outside of the plan’s control. For now, those errors cause the whole controller to shut down.



1456
1457
1458
# File 'lib/roby/execution_engine.rb', line 1456

def application_exceptions
  @application_exceptions
end

#at_cycle_end_handlersObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of blocks that are called at each cycle end



2143
2144
2145
# File 'lib/roby/execution_engine.rb', line 2143

def at_cycle_end_handlers
  @at_cycle_end_handlers
end

#controlObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The DecisionControl object associated with this engine



180
181
182
# File 'lib/roby/execution_engine.rb', line 180

def control
  @control
end

#cycle_indexObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The number of this cycle since the beginning



2202
2203
2204
# File 'lib/roby/execution_engine.rb', line 2202

def cycle_index
  @cycle_index
end

#cycle_lengthObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The cycle length in seconds



2196
2197
2198
# File 'lib/roby/execution_engine.rb', line 2196

def cycle_length
  @cycle_length
end

#cycle_startObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The starting Time of this cycle



2199
2200
2201
# File 'lib/roby/execution_engine.rb', line 2199

def cycle_start
  @cycle_start
end

#delayed_eventsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of pending delayed events. This is an array of the form

[[time, is_forward, source, target, context], ...]

See #add_event_delay for more information



545
546
547
# File 'lib/roby/execution_engine.rb', line 545

def delayed_events
  @delayed_events
end

#dependency_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for TaskStructure::Dependency

This is here for performance reasons, to avoid resolving the same graph over and over



169
170
171
# File 'lib/roby/execution_engine.rb', line 169

def dependency_graph
  @dependency_graph
end

#emitted_eventsArray<Event> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop)



187
188
189
# File 'lib/roby/execution_engine.rb', line 187

def emitted_events
  @emitted_events
end

#event_loggerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The underlying DRoby::EventLogger

It is usually the same than the #plan‘s. Pass a DRoby::NullEventLogger at construction time to disable logging of execution events.



178
179
180
# File 'lib/roby/execution_engine.rb', line 178

def event_logger
  @event_logger
end

#event_orderingObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The topological ordering of events w.r.t. the Precedence relation. This gets updated on-demand when the event relations change.



1004
1005
1006
# File 'lib/roby/execution_engine.rb', line 1004

def event_ordering
  @event_ordering
end

#event_prioritiesObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The event => index hash which give the propagation priority for each event



1007
1008
1009
# File 'lib/roby/execution_engine.rb', line 1007

def event_priorities
  @event_priorities
end

#exception_listenersArray<#call> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The blocks that are currently listening to exceptions



190
191
192
# File 'lib/roby/execution_engine.rb', line 190

def exception_listeners
  @exception_listeners
end

#finalizersObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of proc objects which are to be called when the execution engine quits.



2517
2518
2519
# File 'lib/roby/execution_engine.rb', line 2517

def finalizers
  @finalizers
end

#forward_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for EventStructure::Forward

This is here for performance reasons, to avoid resolving the same graph over and over



163
164
165
# File 'lib/roby/execution_engine.rb', line 163

def forward_graph
  @forward_graph
end

#last_stop_countObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

:nodoc:



2288
2289
2290
# File 'lib/roby/execution_engine.rb', line 2288

def last_stop_count
  @last_stop_count
end

#once_blocksQueue (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Thread-safe queue to push work to the execution engine

Do not access directly, use #once instead



198
199
200
# File 'lib/roby/execution_engine.rb', line 198

def once_blocks
  @once_blocks
end

#planObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Plan this engine is acting on



172
173
174
# File 'lib/roby/execution_engine.rb', line 172

def plan
  @plan
end

#precedence_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for Roby::EventStructure::Precedence

This is here for performance reasons, to avoid resolving the same graph over and over



151
152
153
# File 'lib/roby/execution_engine.rb', line 151

def precedence_graph
  @precedence_graph
end

#process_everyObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of blocks which are called every cycle



2165
2166
2167
# File 'lib/roby/execution_engine.rb', line 2165

def process_every
  @process_every
end

#propagation_idObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A numeric ID giving the count of the current propagation cycle



182
183
184
# File 'lib/roby/execution_engine.rb', line 182

def propagation_id
  @propagation_id
end

#propagation_sourcesObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of source events for the current propagation action. This is a mix of EventGenerator and Event objects.



514
515
516
# File 'lib/roby/execution_engine.rb', line 514

def propagation_sources
  @propagation_sources
end

#schedulerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The scheduler is the object which handles non-generic parts of the propagation cycle. For now, its #initial_events method is called at the beginning of each propagation cycle and can call or emit a set of events.

See Schedulers::Basic



485
486
487
# File 'lib/roby/execution_engine.rb', line 485

def scheduler
  @scheduler
end

#signal_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for EventStructure::Signal

This is here for performance reasons, to avoid resolving the same graph over and over



157
158
159
# File 'lib/roby/execution_engine.rb', line 157

def signal_graph
  @signal_graph
end

#threadObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The execution thread if there is one running



2190
2191
2192
# File 'lib/roby/execution_engine.rb', line 2190

def thread
  @thread
end

#thread_poolConcurrent::CachedThreadPool (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A thread pool on which async work should be executed

See Also:



145
146
147
# File 'lib/roby/execution_engine.rb', line 145

def thread_pool
  @thread_pool
end

#waiting_workArray<#fail,#complete?> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A list of threaded objects waiting for the control thread

Objects registered here will be notified them by calling #fail on them when it quits. In addition, #join_all_waiting_work will wait for all pending jobs to finish.

Note that all Concurrent::Obligation subclasses fit the bill



2140
2141
2142
# File 'lib/roby/execution_engine.rb', line 2140

def waiting_work
  @waiting_work
end

Class Method Details

.call_every(plan) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Calls the periodic blocks which should be called



2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
# File 'lib/roby/execution_engine.rb', line 2108

def self.call_every(plan) # :nodoc:
    engine = plan.execution_engine
    now = engine.cycle_start
    length = engine.cycle_length
    engine.process_every.map! do |handler, last_call, duration|
        next if handler.disposed?

        # Check if the nearest timepoint is the beginning of
        # this cycle or of the next cycle
        if !last_call || (duration - (now - last_call)) < length / 2
            unless handler.call(engine, engine.plan)
                next
            end

            next if handler.once?
            next if handler.disposed?

            last_call = now
        end
        [handler, last_call, duration]
    end.compact!
end

.make_delay(timeref, source, target, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a Time object which represents the absolute point in time referenced by timespec in the context of delaying a propagation between source and target.

See validate_timespec for more information



994
995
996
997
998
999
1000
# File 'lib/roby/execution_engine.rb', line 994

def self.make_delay(timeref, source, target, timespec)
    if delay = timespec[:delay] then timeref + delay
    elsif at = timespec[:at] then at
    else
        raise ArgumentError, "invalid timespec #{timespec}"
    end
end

.validate_timespec(timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validates timespec as a delay specification. A valid delay specification is either nil or a hash, in which case two forms are possible:

at: absolute_time
delay: number


983
984
985
986
987
# File 'lib/roby/execution_engine.rb', line 983

def self.validate_timespec(timespec)
    if timespec
        timespec = validate_options timespec, i[delay at]
    end
end

Instance Method Details

#add_error(e, propagate_through: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Register a LocalizedError for future propagation

This method must be called in a error-gathering context (i.e. #gather_errors).

Raises:



638
639
640
641
642
643
644
645
646
647
648
# File 'lib/roby/execution_engine.rb', line 638

def add_error(e, propagate_through: nil)
    plan_exception = e.to_execution_exception
    if @propagation_exceptions
        @propagation_exceptions << [plan_exception, propagate_through]
    else
        Roby.log_exception_with_backtrace(e, self, :fatal)
        raise NotPropagationContext,
              "#add_error called outside an error-gathering "\
              "context (#add_error)"
    end
end

#add_event_delay(time, is_forward, source, target, context) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a propagation step to be performed when the current time is greater than time. The propagation step is a signal if is_forward is false and a forward otherwise.

This method should not be called directly. Use #add_event_propagation with the appropriate timespec argument.

See also #delayed_events and #execute_delayed_events



555
556
557
# File 'lib/roby/execution_engine.rb', line 555

def add_event_delay(time, is_forward, source, target, context)
    delayed_events << [time, is_forward, source, target, context]
end

#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a propagation to the next propagation step: it registers a propagation step to be performed between source and target with the given context. If is_forward is true, the propagation will be a forwarding, otherwise it is a signal.

If timespec is not nil, it defines a delay to be applied before calling the target event.

See #gather_propagation



764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
# File 'lib/roby/execution_engine.rb', line 764

def add_event_propagation(is_forward, sources, target, context, timespec)
    if target.plan != plan
        raise Roby::EventNotExecutable.new(target), "#{target} not in executed plan"
    end

    target.pending(sources.find_all { |ev| ev.kind_of?(Event) })

    @propagation_step_id += 1
    target_info = (@propagation[target] ||= [@propagation_step_id, [], []])
    step = target_info[is_forward ? PENDING_PROPAGATION_FORWARD : PENDING_PROPAGATION_SIGNAL]
    if sources.empty?
        step << nil << context << timespec
    else
        sources.each do |ev|
            step << ev << context << timespec
        end
    end
end

#add_exceptions_for_inhibition(fatal_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles



1938
1939
1940
1941
1942
1943
1944
1945
# File 'lib/roby/execution_engine.rb', line 1938

def add_exceptions_for_inhibition(fatal_errors)
    fatal_errors.each do |exception, involved_tasks|
        involved_tasks.each do |t|
            (@pending_exceptions[t] ||= Set.new) <<
                [exception.exception.class, exception.origin]
        end
    end
end

#add_framework_error(error, source) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers the given error and a description of its source in the list of application/framework errors

It must be called within an exception-gathering context, that is either within #process_events, or within #gather_framework_errors

These errors will terminate the event loop



717
718
719
720
721
722
723
724
# File 'lib/roby/execution_engine.rb', line 717

def add_framework_error(error, source)
    if @application_exceptions
        @application_exceptions << [error, source]
    else
        Roby.log_exception_with_backtrace(error, self, :fatal)
        raise NotPropagationContext, "#add_framework_error called outside an exception-gathering context"
    end
end

#at_cycle_end(description: "at_cycle_end", **options) {|plan| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a block to be called at the end of each execution cycle

Yield Parameters:

  • plan (Plan)

    the plan on which this engine runs



2151
2152
2153
2154
2155
# File 'lib/roby/execution_engine.rb', line 2151

def at_cycle_end(description: "at_cycle_end", **options, &block)
    handler = PollBlockDefinition.new(description, block, **options)
    at_cycle_end_handlers << handler
    handler
end

#call_poll_blocks(blocks, late = false) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Helper that calls the propagation handlers in propagation_handlers (which are expected to be instances of PollBlockDefinition) and handles the errors according of each handler’s policy



804
805
806
807
808
809
810
811
812
813
814
815
816
817
# File 'lib/roby/execution_engine.rb', line 804

def call_poll_blocks(blocks, late = false)
    blocks.delete_if do |handler|
        if handler.disabled? || (handler.late? ^ late)
            next(handler.disposed?)
        end

        log_timepoint_group handler.description do
            unless handler.call(self, plan)
                handler.disabled = true
            end
        end
        handler.once? || handler.disposed?
    end
end

#call_propagation_handlersObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
# File 'lib/roby/execution_engine.rb', line 840

def call_propagation_handlers
    process_once_blocks
    if scheduler.enabled?
        gather_framework_errors("scheduler") do
            scheduler.initial_events
            log_timepoint "scheduler"
        end
    end
    call_poll_blocks(self.class.propagation_handlers, false)
    call_poll_blocks(self.propagation_handlers, false)

    unless has_queued_events?
        call_poll_blocks(self.class.propagation_handlers, true)
        call_poll_blocks(self.propagation_handlers, true)
    end
end

#clearObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.

Returns nil if the plan is cleared, and the set of remaining tasks otherwise. Note that quaranteened tasks are not counted as remaining, as it is not possible for the execution engine to stop them.



2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
# File 'lib/roby/execution_engine.rb', line 2296

def clear
    plan.mission_tasks.dup.each { |t| plan.unmark_mission_task(t) }
    plan.permanent_tasks.dup.each { |t| plan.unmark_permanent_task(t) }
    plan.permanent_events.dup.each { |t| plan.unmark_permanent_event(t) }
    plan.force_gc.merge(plan.tasks)

    quaranteened_subplan = plan.compute_useful_tasks(plan.quarantined_tasks)
    remaining = plan.tasks - quaranteened_subplan

    @pending_exceptions.clear

    if remaining.empty?
        # Have to call #garbage_collect one more to make
        # sure that unneeded events are removed as well
        garbage_collect
        # Done cleaning the tasks, clear the remains
        plan.transactions.each do |trsc|
            trsc.discard_transaction if trsc.self_owned?
        end
        plan.clear
        emitted_events.clear
        return
    end
    remaining
end

#clear_application_exceptionsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



1458
1459
1460
1461
1462
1463
1464
1465
# File 'lib/roby/execution_engine.rb', line 1458

def clear_application_exceptions
    unless @application_exceptions
        raise RecursivePropagationContext, "unbalanced call to #clear_application_exceptions"
    end

    result, @application_exceptions = @application_exceptions, nil
    result
end

#compute_errors(events_errors) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute the set of fatal errors in the current execution state



1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
# File 'lib/roby/execution_engine.rb', line 1478

def compute_errors(events_errors)
    # Generate exceptions from task structure
    structure_errors = plan.check_structure
    log_timepoint "structure_check"

    # Propagate the errors. Note that the plan repairs are taken into
    # account in ExecutionEngine.propagate_exceptions directly.  We keep
    # event and structure errors separate since in the first case there
    # is not two-stage handling (all errors that have not been handled
    # are fatal), and in the second case we call #check_structure
    # again to errors that are remaining after the call to the exception
    # handlers
    events_errors, free_events_errors, events_handled =
        propagate_exceptions(events_errors)
    _, structure_handled = propagate_exceptions(structure_errors)
    log_timepoint "exception_propagation"

    # Get the remaining problems in the plan structure, and act on it
    structure_errors, structure_inhibited =
        remove_inhibited_exceptions(plan.check_structure)

    # Partition them by fatal/nonfatal
    fatal_errors, nonfatal_errors = [], []
    (structure_errors + events_errors).each do |e, involved_tasks|
        if e.fatal?
            fatal_errors << [e, involved_tasks]
        else
            nonfatal_errors << [e, involved_tasks]
        end
    end
    kill_tasks =
        compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors).to_set
    handled_errors = structure_handled + events_handled

    debug "#{fatal_errors.size} fatal errors found and "\
          "#{free_events_errors.size} errors involving free events"
    debug "the fatal errors involve #{kill_tasks.size} non-finalized tasks"
    PropagationInfo.new(
        Set.new, Set.new, kill_tasks, fatal_errors, nonfatal_errors,
        free_events_errors, handled_errors, structure_inhibited
    )
end

#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute the set of unhandled fatal exceptions



919
920
921
922
923
924
925
926
# File 'lib/roby/execution_engine.rb', line 919

def compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors)
    kill_tasks = fatal_errors.inject(Set.new) do |tasks, (_exception, affected_tasks)|
        tasks.merge(affected_tasks)
    end
    # Tasks might have been finalized during exception handling, filter
    # those out
    kill_tasks.find_all(&:plan)
end

#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called at each cycle end



2549
2550
2551
2552
2553
# File 'lib/roby/execution_engine.rb', line 2549

def cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?)
    gather_framework_errors("#cycle_end", raise_caught_exceptions: raise_framework_errors) do
        call_poll_blocks(at_cycle_end_handlers)
    end
end

#delayed(delay, description: "delayed block", **options, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules block to be called once after delay seconds passed, in the propagation context



1445
1446
1447
1448
1449
1450
1451
# File 'lib/roby/execution_engine.rb', line 1445

def delayed(delay, description: "delayed block", **options, &block)
    handler = PollBlockDefinition.new(description, block, once: true, **options)
    once do
        process_every << [handler, cycle_start, delay]
    end
    handler
end

#display_exceptions=(flag) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Controls whether this engine should indiscriminately display all fatal exceptions

This is on by default



2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
# File 'lib/roby/execution_engine.rb', line 2706

def display_exceptions=(flag)
    unless flag
        @exception_display_handler.dispose
        return
    end
    return unless @exception_display_handler.disposed?

    @exception_display_handler = on_exception do |kind, error, tasks|
        level = if kind == EXCEPTION_HANDLED then :debug
                else
                    :warn
                end

        send(level) do
            send(level, "encountered a #{kind} exception")
            Roby.log_exception_with_backtrace(error.exception, self, level)
            if kind == EXCEPTION_HANDLED
                send(level, "the exception was handled by")
            else
                send(level, "the exception involved")
            end
            tasks.each do |t|
                send(level, "  #{t}")
            end
            break
        end
    end
end

#display_exceptions?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

whether this engine should indiscriminately display all fatal exceptions



2737
2738
2739
# File 'lib/roby/execution_engine.rb', line 2737

def display_exceptions?
    !@exception_display_handler.disposed?
end

#error_handling_phase(events_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute errors in plan and handle the results



904
905
906
907
908
909
910
911
912
913
914
915
916
# File 'lib/roby/execution_engine.rb', line 904

def error_handling_phase(events_errors)
    # Do the exception handling phase
    errors = compute_errors(events_errors)
    notify_about_error_handling_results(errors)

    # nonfatal errors are only notified. Fatal errors (kill_tasks) are
    # handled in the propagation loop during garbage collection. Only
    # the free events errors have to be handled here.
    errors.free_events_errors.each do |exception, generators|
        generators.each { |g| g.unreachable!(exception.exception) }
    end
    errors
end

#event_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The main event loop. It returns when the execution engine is asked to quit. In general, this does not need to be called direclty: use #run to start the event loop in a separate thread.



2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
# File 'lib/roby/execution_engine.rb', line 2351

def event_loop
    @cycle_start = Time.now
    @cycle_index = 0

    exit_state = EventLoopExitState.new(0, Time.now, nil)
    @interrupted = false
    loop do
        GC::Profiler.enable if profile_gc?

        if @interrupted
            @interrupted = false
            event_loop_handle_interrupt(exit_state)
        end

        if quitting?
            return if forced_exit?
            return if event_loop_teardown(exit_state)
        end

        log_timepoint_group "cycle" do
            execute_one_cycle
        end

        GC::Profiler.disable if profile_gc?
    rescue Exception => e
        if quitting?
            fatal "Execution thread FORCEFULLY quitting "\
                  "because of unhandled exception"
            Roby.log_exception_with_backtrace(e, self, :fatal)
            raise
        else
            quit

            fatal "Execution thread quitting because of unhandled exception"
            Roby.log_exception_with_backtrace(e, self, :fatal)
        end
    end
ensure
    unless plan.tasks.empty?
        warn "the following tasks are still present in the plan:"
        plan.tasks.each do |t|
            warn "  #{t}"
        end
    end
end

#event_loop_handle_interrupt(exit_state) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle a received Interrupt for #event_loop



2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
# File 'lib/roby/execution_engine.rb', line 2426

def event_loop_handle_interrupt(exit_state)
    if quitting?
        exit_state.force_exit_deadline ||=
            Time.now + INTERRUPT_FORCE_EXIT_DEADLINE
        time_until_deadline = exit_state.force_exit_deadline - Time.now
        if time_until_deadline < 0
            fatal "Quitting without cleaning up"
            force_quit
        else
            fatal "Still #{time_until_deadline.ceil}s before "\
                  "interruption will quit without cleaning up"
        end
    else
        fatal "Received interruption request"
        fatal "Interrupt again in #{INTERRUPT_FORCE_EXIT_DEAD_ZONE}s "\
              "to quit without cleaning up"
        quit
        exit_state.force_exit_deadline =
            Time.now + INTERRUPT_FORCE_EXIT_DEAD_ZONE
    end
end

#event_loop_teardown(exit_state) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle the teardown logic for #event_loop



2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
# File 'lib/roby/execution_engine.rb', line 2402

def event_loop_teardown(exit_state)
    return true unless (remaining = clear)

    display_warning = (exit_state.last_stop_count != remaining.size) ||
                      (Time.now - exit_state.last_quit_warning) > 10
    return unless display_warning

    if display_warning
        Robot.info "Roby quitting ..." if exit_state.last_stop_count == 0

        issue_quit_progression_warning(remaining)
        exit_state.last_quit_warning = Time.now
        exit_state.last_stop_count = remaining.size
    end
    false
rescue Exception => e
    Robot.warn "Execution thread failed to clean up"
    Roby.log_exception_with_backtrace(e, Robot, :warn, filter: false)
    true
end

#event_propagation_phase(initial_events, propagation_info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block

If a block is given, it is called with the initial set of events: the events we should consider as already emitted in the following propagation. seeds si a list of procs which should be called to initiate the propagation (i.e. build an initial set of events)



889
890
891
892
893
894
895
896
897
898
899
900
901
# File 'lib/roby/execution_engine.rb', line 889

def event_propagation_phase(initial_events, propagation_info)
    @propagation_id += 1

    gather_errors do
        next_steps = initial_events
        until next_steps.empty?
            until next_steps.empty?
                next_steps = event_propagation_step(next_steps, propagation_info)
            end
            next_steps = gather_propagation { call_propagation_handlers }
        end
    end
end

#event_propagation_step(current_step, propagation_info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagate one step

current_step describes all pending emissions and calls.

This method calls ExecutionEngine.next_event to get the description of the next event to call. If there are signals going to this event, they are processed and the forwardings will be treated in the next step.

The method returns the next set of pending emissions and calls, adding the forwardings and signals that the propagation of the considered event have added.



1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
# File 'lib/roby/execution_engine.rb', line 1116

def event_propagation_step(current_step, propagation_info)
    signalled, _step_id, forward_info, call_info = next_event(current_step)

    next_step = nil
    if !call_info.empty?
        source_events, source_generators, context =
            prepare_propagation(signalled, false, call_info)
        if source_events
            log(:generator_propagate_events, false, source_events, signalled)

            if signalled.self_owned?
                next_step = gather_propagation(current_step) do
                    propagation_context(source_events | source_generators) do
                        begin
                            propagation_info.add_generator_call(signalled)
                            signalled.call_without_propagation(context)
                        rescue Roby::LocalizedError => e
                            if signalled.command_emitted?
                                add_error(e)
                            else
                                signalled.emit_failed(e)
                            end
                        rescue Exception => e
                            if signalled.command_emitted?
                                add_error(Roby::CommandFailed.new(e, signalled))
                            else
                                signalled.emit_failed(Roby::CommandFailed.new(e, signalled))
                            end
                        end
                    end
                end
            end
        end

        if forward_info
            next_step ||= {}
            target_info = (next_step[signalled] ||= [@propagation_step_id += 1, [], []])
            target_info[PENDING_PROPAGATION_FORWARD].concat(forward_info)
        end

    elsif !forward_info.empty?
        source_events, source_generators, context =
            prepare_propagation(signalled, true, forward_info)
        if source_events
            log(:generator_propagate_events, true, source_events, signalled)

            # If the destination event is not owned, but if the peer is not
            # connected, the event is our responsibility now.
            if signalled.self_owned? || signalled.owners.none? { |peer| peer != plan.local_owner && peer.connected? }
                next_step = gather_propagation(current_step) do
                    propagation_context(source_events | source_generators) do
                        begin
                            if event = signalled.emit_without_propagation(context)
                                propagation_info.add_event_emission(event)
                                emitted_events << event
                            end
                        rescue Roby::LocalizedError => e
                            Roby.warn(
                                "Internal Error: #emit_without_propagation "\
                                "emitted a LocalizedError exception. This is "\
                                "unsupported and will become a fatal error in "\
                                "the future. You should usually replace raise "\
                                "with engine.add_error"
                            )
                            Roby.display_exception(
                                Roby.logger.io(:warn), e, false
                            )
                            add_error(e)
                        rescue Exception => e
                            Roby.warn(
                                "Internal Error: #emit_without_propagation "\
                                "emitted an exception. This is unsupported "\
                                "and will become a fatal error in the future. "\
                                "You should create a proper localized error "\
                                "and replace raise with engine.add_error"
                            )
                            Roby.display_exception(
                                Roby.logger.io(:warn), e, false
                            )
                            add_error(Roby::EmissionFailed.new(e, signalled))
                        end
                    end
                end
            end
        end
    end

    current_step.merge!(next_step) if next_step
    current_step
end

#every(duration, description: "periodic handler", **options, &block) ⇒ #dispose

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call block every duration seconds. Note that duration is round up to the cycle size (time between calls is *at least* duration)



2172
2173
2174
2175
2176
2177
2178
2179
2180
# File 'lib/roby/execution_engine.rb', line 2172

def every(duration, description: "periodic handler", **options, &block)
    handler = PollBlockDefinition.new(description, block, **options)
    once do
        if handler.call(self, plan)
            process_every << [handler, cycle_start, duration]
        end
    end
    handler
end

#execute(catch: [], type: :external_events, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context. If the block raises, the exception is raised back in the calling thread.

Raises:

  • (ArgumentError)


2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
# File 'lib/roby/execution_engine.rb', line 2558

def execute(catch: [], type: :external_events, &block)
    raise ArgumentError, "a block is required" unless block_given?
    if inside_control?
        return yield
    end

    capture_catch = lambda do |symbol, *other|
        caught = catch(symbol) do
            if other.empty?
                return [:ret, yield]
            else
                return capture_catch(block, *other)
            end
        end
        [:throw, [symbol, caught]]
    end

    ivar = Concurrent::IVar.new
    once(sync: ivar, type: type) do
        begin
            if !catch.empty?
                result = capture_catch.call(*catch, &block)
                ivar.set(result)
            else
                ivar.set([:ret, yield])
            end
        rescue ::Exception => e # rubocop:disable Lint/RescueException
            ivar.set([:raise, e])
        end
    end

    mode, value = ivar.value!
    case mode
    when :ret
        value
    when :throw
        throw(*value)
    else
        raise value
    end
end

#execute_delayed_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds the events in delayed_events whose time has passed into the propagation. This must be called in propagation context.

See #add_event_delay and #delayed_events



563
564
565
566
567
568
569
570
571
# File 'lib/roby/execution_engine.rb', line 563

def execute_delayed_events
    reftime = Time.now
    delayed_events.delete_if do |time, forward, source, signalled, context|
        if time <= reftime
            add_event_propagation(forward, [source], signalled, context, nil)
            true
        end
    end
end

#execute_one_cycle(time = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
# File 'lib/roby/execution_engine.rb', line 2448

def execute_one_cycle(time = Time.now)
    last_process_times = Process.times
    last_dump_time = plan.event_logger.dump_time

    while time > cycle_start + cycle_length
        @cycle_start += cycle_length
        @cycle_index += 1
    end
    stats = {}
    stats[:start] = [cycle_start.tv_sec, cycle_start.tv_usec]
    stats[:actual_start] = time - cycle_start
    stats[:cycle_index] = cycle_index

    log_timepoint_group "process_events" do
        process_events
    end

    execute_side_work
    log_timepoint "side-work"

    if use_oob_gc?
        stats[:pre_oob_gc] = GC.stat
        GC::OOB.run
    end

    # Sleep if there is enough time for it
    remaining_cycle_time = cycle_length - (Time.now - cycle_start)
    if remaining_cycle_time > SLEEP_MIN_TIME
        sleep(remaining_cycle_time)
    end
    log_timepoint "sleep"

    # Log cycle statistics
    process_times = Process.times
    dump_time = plan.event_logger.dump_time
    stats[:log_queue_size]   = plan.log_queue_size
    stats[:plan_task_count]  = plan.num_tasks
    stats[:plan_event_count] = plan.num_free_events
    stats[:gc] = GC.stat
    stats[:utime] = process_times.utime - last_process_times.utime
    stats[:stime] = process_times.stime - last_process_times.stime
    stats[:dump_time] = dump_time - last_dump_time
    stats[:state] = Roby::State
    stats[:end] = Time.now - cycle_start
    if profile_gc?
        stats[:gc_profile_data] = GC::Profiler.raw_data
        stats[:gc_total_time] = GC::Profiler.total_time
    else
        stats[:gc_profile_data] = nil
        stats[:gc_total_time] = 0
    end

    cycle_end(stats)
    log_flush_cycle :cycle_end, stats

    @cycle_start += cycle_length
    @cycle_index += 1
end

#execute_side_workObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Execute the work registered with Roby::ExecutionEngine::PropagationHandlerMethods#add_side_work_handler



429
430
431
432
# File 'lib/roby/execution_engine.rb', line 429

def execute_side_work
    call_poll_blocks(self.side_work_handlers, false)
    call_poll_blocks(self.class.side_work_handlers, false)
end

#finalized_event(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #plan when an event has been finalized



584
585
586
587
588
# File 'lib/roby/execution_engine.rb', line 584

def finalized_event(event)
    @propagation&.delete(event)
    event.unreachable!("finalized", plan)
    # since the event is already finalized,
end

#finalized_task(task) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #plan when a task has been finalized



579
580
581
# File 'lib/roby/execution_engine.rb', line 579

def finalized_task(task)
    @pending_exceptions.delete(task)
end

#force_quitObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Force quitting, without cleaning up



2539
2540
2541
# File 'lib/roby/execution_engine.rb', line 2539

def force_quit
    @quit = 2
end

#forced_exit?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the control thread is currently quitting



2525
2526
2527
# File 'lib/roby/execution_engine.rb', line 2525

def forced_exit?
    @quit > 1
end

#garbage_collect(force_on = nil) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Kills and removes unneeded tasks for which there are no dependencies



1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
# File 'lib/roby/execution_engine.rb', line 1975

def garbage_collect(force_on = nil)
    if force_on && !force_on.empty?
        info "GC: adding #{force_on.size} tasks in the force_gc set"
        valid_plan, mismatching_plan = force_on.partition do |t|
            t.plan == self.plan
        end
        plan.force_gc.merge(valid_plan)

        unless mismatching_plan.empty?
            mismatches_s = mismatching_plan.map { |t| "#{t}(plan=#{t.plan})" }
                                           .join(", ")
            raise ArgumentError,
                  "#{mismatches_s} have been given to #{self}.garbage_collect, "\
                  "but they are not tasks in #{plan}"
        end
    end

    unmark_finished_missions_and_permanent_tasks

    # The set of tasks for which we will queue stop! at this cycle
    # #finishing? is false until the next event propagation cycle
    finishing = Set.new

    # Loop until no tasks have been removed
    loop do
        tasks = plan.unneeded_tasks | plan.force_gc
        local_tasks = plan.local_tasks & tasks
        remote_tasks = tasks - local_tasks

        # Remote tasks are simply removed, regardless of other concerns
        for t in remote_tasks
            debug { "GC: removing the remote task #{t}" }
            plan.garbage_task(t)
        end

        break if local_tasks.empty?

        debug do
            debug "#{local_tasks.size} tasks are unneeded in this plan"
            local_tasks.each do |t|
                debug "  #{t} mission=#{plan.mission_task?(t)} "\
                      "permanent=#{plan.permanent_task?(t)}"
            end
            break
        end

        # Find the roots, that is the tasks we should be trying to
        # stop. They are the tasks which have no running parents.
        roots = local_tasks.dup - finishing
        plan.default_useful_task_graphs.each do |g|
            roots.delete_if do |t|
                g.each_in_neighbour(t).any? { |p| !p.finished? }
            end

            break if roots.empty?
        end

        new_finishing_tasks =
            roots.find_all { |local_task| garbage_collect_stop_task(local_task) }
        finishing.merge(new_finishing_tasks)

        break unless roots.any?(&:finalized?)
    end

    finishing.each(&:stop!)

    plan.unneeded_events.each do |event|
        plan.garbage_event(event)
    end

    !finishing.empty?
end

#garbage_collect_stop_task(local_task) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle a single root task in the #garbage_collect process



2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
# File 'lib/roby/execution_engine.rb', line 2054

def garbage_collect_stop_task(local_task)
    if local_task.pending?
        info "GC: removing pending task #{local_task}"
        plan.garbage_task(local_task)
    elsif local_task.failed_to_start?
        info "GC: removing task that failed to start #{local_task}"
        plan.garbage_task(local_task)
    elsif local_task.starting?
        # wait for task to be started before killing it
        debug "GC: #{local_task} is starting"
    elsif local_task.finished?
        debug "GC: #{local_task} is not running, removed"
        plan.garbage_task(local_task)
    elsif local_task.finishing?
        debug do
            debug "GC: waiting for #{local_task} to finish"
            local_task.history.each do |ev|
                debug "GC:   #{ev}"
            end
            break
        end
    elsif local_task.quarantined?
        warn "GC: #{local_task} is running but in quarantine"
    elsif local_task.event(:stop).controlable?
        debug { "GC: attempting to stop #{local_task}" }
        if !local_task.respond_to?(:stop!)
            warn "something fishy: #{local_task}/stop is controlable but "\
                 "there is no #stop! method, putting in quarantine"
            plan.quarantine_task(local_task)
        else
            return true
        end
    else
        warn "GC: #{local_task} cannot be stopped, putting in quarantine"
        plan.quarantine_task(local_task)
    end
    false
end

#gather_errorsArray<ExecutionException>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes the given block while gathering errors, and returns the errors that have been declared with #add_error



865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
# File 'lib/roby/execution_engine.rb', line 865

def gather_errors
    if @propagation_exceptions
        raise InternalError, "recursive call to #gather_errors"
    end

    # The ensure clause must NOT apply to the recursive check above.
    # Otherwise, we end up resetting @propagation_exceptions to nil,
    # which wreaks havoc
    begin
        @propagation_exceptions = []
        yield
        @propagation_exceptions
    ensure
        @propagation_exceptions = nil
    end
end

#gather_external_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Gather the events that come out of this plan manager



833
834
835
836
837
838
# File 'lib/roby/execution_engine.rb', line 833

def gather_external_events
    process_once_blocks
    gather_framework_errors("delayed events") { execute_delayed_events }
    call_poll_blocks(self.class.external_events_handlers)
    call_poll_blocks(self.external_events_handlers)
end

#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Yields to the block and registers any raised exception using #add_framework_error

If the method is called within an exception-gathering context (either #process_events or #gather_framework_errors itself), nothing else is done. Otherwise, #process_pending_application_exceptions is called to re-raise any caught exception



657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
# File 'lib/roby/execution_engine.rb', line 657

def gather_framework_errors(source, raise_caught_exceptions: true)
    if @application_exceptions
        recursive_error_gathering_context = true
    else
        @application_exceptions = []
    end

    yield

    if !recursive_error_gathering_context && !raise_caught_exceptions
        clear_application_exceptions
    end
rescue Exception => e
    add_framework_error(e, source)
    if !recursive_error_gathering_context && !raise_caught_exceptions
        clear_application_exceptions
    end
ensure
    if !recursive_error_gathering_context && raise_caught_exceptions
        process_pending_application_exceptions
    end
end

#gather_propagation(initial_set = {}, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets up a propagation context, yielding the block in it. During this propagation stage, all calls to #emit and #call are stored in an internal hash of the form:

target => [forward_sources, signal_sources]

where the two _sources are arrays of the form

[[source, context], ...]

The method returns the resulting hash. Use #in_propagation_context? to know if the current engine is in a propagation context, and #add_event_propagation to add a new entry to this set.



606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
# File 'lib/roby/execution_engine.rb', line 606

def gather_propagation(initial_set = {}, &block)
    if in_propagation_context?
        raise InternalError, "nested call to #gather_propagation"
    end

    old_allow_propagation, @allow_propagation = @allow_propagation, true

    # The ensure clause must NOT apply to the recursive check above.
    # Otherwise, we end up resetting @propagation_exceptions to nil,
    # which wreaks havoc
    begin
        @propagation = initial_set
        @propagation_sources = nil
        @propagation_step_id = 0

        propagation_context([], &block)

        result, @propagation = @propagation, nil
        result
    ensure
        @propagation = nil
        @allow_propagation = old_allow_propagation
    end
end

#gathering?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



498
499
500
501
502
# File 'lib/roby/execution_engine.rb', line 498

def gathering?
    Roby.warn_deprecated "#gathering? is deprecated, use "\
                         "#in_propagation_context? instead"
    in_propagation_context?
end

#gathering_errors?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



857
858
859
# File 'lib/roby/execution_engine.rb', line 857

def gathering_errors?
    !!@propagation_exceptions
end

#has_pending_exception_matching?(e, object) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object



1932
1933
1934
# File 'lib/roby/execution_engine.rb', line 1932

def has_pending_exception_matching?(e, object)
    @pending_exceptions[object]&.include?([e.exception.class, e.origin])
end

#has_pending_forward?(from, to, expected_context) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether a forward matching this signature is currently pending



784
785
786
787
788
789
790
# File 'lib/roby/execution_engine.rb', line 784

def has_pending_forward?(from, to, expected_context)
    if pending = @propagation[to]
        pending[PENDING_PROPAGATION_FORWARD].each_slice(3).any? do |event, context, timespec|
            (from === event.generator) && (expected_context === context)
        end
    end
end

#has_pending_signal?(from, to, expected_context) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether a signal matching this signature is currently pending



793
794
795
796
797
798
799
# File 'lib/roby/execution_engine.rb', line 793

def has_pending_signal?(from, to, expected_context)
    if pending = @propagation[to]
        pending[PENDING_PROPAGATION_SIGNAL].each_slice(3).any? do |event, context, timespec|
            (from === event.generator) && (expected_context === context)
        end
    end
end

#has_propagation_for?(target) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



738
739
740
# File 'lib/roby/execution_engine.rb', line 738

def has_propagation_for?(target)
    @propagation&.has_key?(target)
end

#has_queued_events?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true if some events are queued



591
592
593
# File 'lib/roby/execution_engine.rb', line 591

def has_queued_events?
    !@propagation.empty?
end

#has_waiting_work?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether this EE has asynchronous waiting work waiting to be processed



1522
1523
1524
1525
1526
# File 'lib/roby/execution_engine.rb', line 1522

def has_waiting_work?
    # Filter out unscheduled promises (promises on which #execute was
    # not called). If they are unscheduled, we're not waiting on them
    waiting_work.any? { |w| !w.unscheduled? }
end

#in_propagation_context?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if we are within a propagation context (i.e. within event processing)



506
507
508
# File 'lib/roby/execution_engine.rb', line 506

def in_propagation_context?
    !!@propagation
end

#inhibited_exception?(exception) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Query whether the given exception is inhibited in this plan



1425
1426
1427
1428
# File 'lib/roby/execution_engine.rb', line 1425

def inhibited_exception?(exception)
    unhandled, = remove_inhibited_exceptions([exception.to_execution_exception])
    unhandled.empty?
end

#inside_control?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the current thread is the execution thread of this engine

See #outside_control? for a discussion of the use of #inside_control? and #outside_control? when testing the threading context



2208
2209
2210
2211
# File 'lib/roby/execution_engine.rb', line 2208

def inside_control?
    t = thread
    !t || t == Thread.current
end

#interruptObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2529
2530
2531
# File 'lib/roby/execution_engine.rb', line 2529

def interrupt
    @interrupted = true
end

#issue_quit_progression_warning(remaining) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
# File 'lib/roby/execution_engine.rb', line 2325

def issue_quit_progression_warning(remaining)
    Robot.info(
        "Waiting for #{remaining.size} tasks to finish "\
        "(#{plan.num_tasks} tasks still in plan) and "\
        "#{waiting_work.size} async work jobs"
    )
    remaining.each do |task|
        Robot.info "  #{task}"
    end
    quarantined = remaining.find_all(&:quarantined?)
    unless quarantined.empty?
        Robot.info "#{quarantined.size} tasks in quarantine"
    end
end

#join_all_waiting_work(timeout: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Waits for all obligations in #waiting_work to finish



435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/roby/execution_engine.rb', line 435

def join_all_waiting_work(timeout: nil)
    return [], PropagationInfo.new if waiting_work.empty?

    deadline = if timeout
                   Time.now + timeout
               end

    finished = []
    propagation_info = PropagationInfo.new
    loop do
        framework_errors = gather_framework_errors(
            "#join_all_waiting_work",
            raise_caught_exceptions: false
        ) do
            next_steps = nil
            event_errors = gather_errors do
                next_steps = gather_propagation do
                    finished.concat(process_waiting_work)
                    blocks = []
                    until once_blocks.empty?
                        blocks << once_blocks.pop.last
                    end
                    call_poll_blocks(blocks)
                end
            end

            this_propagation = propagate_events_and_errors(
                next_steps, event_errors, garbage_collect_pass: false
            )
            propagation_info.merge(this_propagation)
        end
        propagation_info.add_framework_errors(framework_errors)

        Thread.pass
        has_scheduled_promises = has_waiting_work?
        if deadline && (Time.now > deadline) && has_scheduled_promises
            raise JoinAllWaitingWorkTimeout.new(waiting_work)
        end

        break unless has_waiting_work?
    end
    [finished, propagation_info]
end

#killallObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Kill all tasks that are currently running in the plan



2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
# File 'lib/roby/execution_engine.rb', line 2640

def killall
    scheduler_enabled = scheduler.enabled?

    plan.permanent_tasks.clear
    plan.permanent_events.clear
    plan.mission_tasks.clear

    scheduler.enabled = false
    quit

    start_new_cycle
    process_events
    cycle_end({})

    plan.transactions.each(&:discard_transaction!)

    start_new_cycle
    Thread.pass
    process_events
    cycle_end({})
ensure
    scheduler.enabled = scheduler_enabled
end

#next_event(pending) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

call-seq:

next_event(pending) => event, propagation_info

Determines the event in current_step which should be signalled now. Removes it from the set and returns the event and the associated propagation information.

See #gather_propagation for the format of the returned # propagation_info



1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
# File 'lib/roby/execution_engine.rb', line 1017

def next_event(pending)
    # this variable is 2 if selected_event is being forwarded, 1 if it
    # is both forwarded and signalled and 0 if it is only signalled
    priority, step_id, selected_event = nil
    for propagation_step in pending
        target_event = propagation_step[0]
        target_step_id, forwards, signals = *propagation_step[1]
        target_priority = if forwards.empty? && signals.empty? then 2
                          elsif forwards.empty? then 0
                          else
                              1
                          end

        do_select = if selected_event
                        if precedence_graph.reachable?(selected_event, target_event)
                            false
                        elsif precedence_graph.reachable?(target_event, selected_event)
                            true
                        elsif priority < target_priority
                            true
                        elsif priority == target_priority
                            # If they are of the same priority, handle
                            # earlier events first
                            step_id > target_step_id
                        else
                            false
                        end
                    else
                        true
                    end

        if do_select
            selected_event = target_event
            priority       = target_priority
            step_id        = target_step_id
        end
    end
    [selected_event, *pending.delete(selected_event)]
end

#notify_about_error_handling_results(errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions



930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
# File 'lib/roby/execution_engine.rb', line 930

def notify_about_error_handling_results(errors)
    kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors =
        errors.kill_tasks, errors.fatal_errors, errors.nonfatal_errors, errors.free_events_errors, errors.handled_errors

    unless nonfatal_errors.empty?
        if display_exceptions?
            warn "#{nonfatal_errors.size} unhandled non-fatal exceptions"
        end
        nonfatal_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_NONFATAL, exception, tasks)
        end
    end

    unless handled_errors.empty?
        if display_exceptions?
            warn "#{handled_errors.size} handled errors"
        end
        handled_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_HANDLED, exception, tasks)
        end
    end

    unless free_events_errors.empty?
        if display_exceptions?
            warn "#{free_events_errors.size} free event exceptions"
        end
        free_events_errors.each do |exception, events|
            notify_exception(EXCEPTION_FREE_EVENT, exception, events)
        end
    end

    unless fatal_errors.empty?
        if display_exceptions?
            warn "#{fatal_errors.size} unhandled fatal exceptions, involving #{kill_tasks.size} tasks that will be forcefully killed"
        end
        fatal_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_FATAL, exception, tasks)
        end
        if display_exceptions?
            kill_tasks.each do |task|
                log_pp :warn, task
            end
        end
    end
end

#notify_exception(kind, error, involved_objects) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call to notify the listeners registered with #on_exception of the occurence of an exception



2743
2744
2745
2746
2747
2748
# File 'lib/roby/execution_engine.rb', line 2743

def notify_exception(kind, error, involved_objects)
    log(:exception_notification, plan.droby_id, kind, error, involved_objects)
    exception_listeners.each do |listener|
        listener.call(self, kind, error, involved_objects)
    end
end

#on_exception(description: "exception listener", on_error: :disable) {|kind, error, tasks| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers a callback that will be called when exceptions are propagated in the plan

Yield Parameters:



2688
2689
2690
2691
2692
# File 'lib/roby/execution_engine.rb', line 2688

def on_exception(description: "exception listener", on_error: :disable, &block)
    handler = PollBlockDefinition.new(description, block, on_error: on_error)
    exception_listeners << handler
    Roby.disposable { exception_listeners.delete(handler) }
end

#once(sync: nil, description: "once block", type: :external_events, **options, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules block to be called at the beginning of the next execution cycle, in propagation context.



1438
1439
1440
1441
# File 'lib/roby/execution_engine.rb', line 1438

def once(sync: nil, description: "once block", type: :external_events, **options, &block)
    waiting_work << sync if sync
    once_blocks << create_propagation_handler(description: description, type: type, once: true, **options, &block)
end

#outside_control?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the current thread is not the execution thread of this engine, or if there is not control thread. When you check the current thread context, always use a negated form. Do not do

if Roby.inside_control?
  ERROR
end

Do instead

if !Roby.outside_control?
  ERROR
end

Since the first form will fail if there is no control thread, while the second form will work. Use the first form only if you require that there actually IS a control thread.



2230
2231
2232
2233
# File 'lib/roby/execution_engine.rb', line 2230

def outside_control?
    t = thread
    !t || t != Thread.current
end

#prepare_propagation(target, is_forward, info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

call-seq:

prepare_propagation(target, is_forward, info) => source_events, source_generators, context
prepare_propagation(target, is_forward, info) => nil

Parses the propagation information info in the context of a signalling if is_forward is true and a forwarding otherwise. target is the target event.

The method adds the appropriate delayed events using #add_event_delay, and returns either nil if no propagation is to be performed, or the propagation source events, generators and context.

The format of info is the same as the hash values described in #gather_propagation.



1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
# File 'lib/roby/execution_engine.rb', line 1071

def prepare_propagation(target, is_forward, info)
    timeref = Time.now

    source_events, source_generators, context = Set.new, Set.new, []

    delayed = true
    info.each_slice(3) do |src, ctxt, time|
        if time && (delay = ExecutionEngine.make_delay(timeref, src, target, time))
            add_event_delay(delay, is_forward, src, target, ctxt)
            next
        end

        delayed = false

        # Merge identical signals. Needed because two different event handlers
        # can both call #emit, and two signals are set up
        if src
            if src.respond_to?(:generator)
                source_events << src
                source_generators << src.generator
            else
                source_generators << src
            end
        end
        if ctxt
            context.concat ctxt
        end
    end

    unless delayed
        [source_events, source_generators, context]
    end
end

#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The inside part of the event loop

It gathers initial events and errors and propagate them

Raises:

  • RecursivePropagationContext if called recursively



1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
# File 'lib/roby/execution_engine.rb', line 1745

def process_events(
    raise_framework_errors: Roby.app.abort_on_application_exception?,
    garbage_collect_pass: true, &caller_block
)
    if @application_exceptions
        raise RecursivePropagationContext, "recursive call to process_events"
    end

    # to avoid having a almost-method-global ensure block
    passed_recursive_check = true
    @application_exceptions = []
    @emitted_events = []

    @thread_pool.send :synchronize do
        @thread_pool.send(:ns_prune_pool)
    end

    # Gather new events and propagate them
    events_errors = nil
    next_steps = gather_propagation do
        events_errors = gather_errors do
            if caller_block
                yield
                caller_block = nil
            end

            if !quitting? || !garbage_collect([])
                process_waiting_work
                log_timepoint "workers"
                gather_external_events
                log_timepoint "external_events"
                call_propagation_handlers
                log_timepoint "propagation_handlers"
            end
        end
    end

    propagation_info = propagate_events_and_errors(
        next_steps, events_errors, garbage_collect_pass: garbage_collect_pass
    )
    if Roby.app.abort_on_exception? && !all_errors.fatal_errors.empty?
        raise Aborting.new(propagation_info.each_fatal_error.map(&:exception))
    end

    propagation_info.framework_errors.concat(@application_exceptions)
    propagation_info
ensure
    if passed_recursive_check
        process_pending_application_exceptions(raise_framework_errors: raise_framework_errors)
    end
end

#process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling. This mode is implemented using this method

When errors occur in this mode, the exceptions are raised directly. This is useful in tests as, this way, we are sure that the exception will not get overlooked

If multiple errors are raised in a single call (this is possible due to Roby’s error handling mechanisms), the method will raise SynchronousEventProcessingMultipleErrors to wrap all the exceptions into one.



1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
# File 'lib/roby/execution_engine.rb', line 1809

def process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block)
    Roby.warn_deprecated "#process_events_synchronous is deprecated, use the expect_execution harness instead"

    if @application_exceptions
        raise RecursivePropagationContext, "recursive call to process_events"
    end

    passed_recursive_check = true # to avoid having a almost-method-global ensure block
    @application_exceptions = []

    # Save early for the benefit of the 'ensure' block
    current_scheduler_enabled = scheduler.enabled?

    if block_given?
        if seeds.empty? && initial_errors.empty?
            seeds = gather_propagation do
                initial_errors = gather_errors(&block)
            end
        else
            raise ArgumentError,
                  "cannot provide both seeds/inital errors and a block"
        end
    end

    scheduler.enabled = enable_scheduler

    propagation_info = propagate_events_and_errors(
        seeds, initial_errors, garbage_collect_pass: false
    )
    unless propagation_info.kill_tasks.empty?
        gc_initial_errors = nil
        gc_seeds = gather_propagation do
            gc_initial_errors = gather_errors do
                garbage_collect(propagation_info.kill_tasks)
            end
        end
        gc_errors = propagate_events_and_errors(
            gc_seeds, gc_initial_errors, garbage_collect_pass: false
        )
        propagation_info.merge(gc_errors)
    end

    if raise_errors
        propagation_info = propagation_info.exceptions
        if propagation_info.size == 1
            raise propagation_info.first
        elsif !propagation_info.empty?
            raise SynchronousEventProcessingMultipleErrors.new(propagation_info.map(&:exception))
        end
    else
        propagation_info
    end
rescue SynchronousEventProcessingMultipleErrors => e
    raise SynchronousEventProcessingMultipleErrors.new(e.errors + clear_application_exceptions)
rescue Exception => e
    if passed_recursive_check
        application_exceptions = clear_application_exceptions
        if !application_exceptions.empty?
            raise SynchronousEventProcessingMultipleErrors.new(application_exceptions.map(&:first) + [e])
        else
            raise e
        end
    else
        raise e
    end
ensure
    if passed_recursive_check && @application_exceptions
        process_pending_application_exceptions
    end
    scheduler.enabled = current_scheduler_enabled
end

#process_once_blocksObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Dispatch #once_blocks to the other handler sets for further processing



821
822
823
824
825
826
827
828
829
830
# File 'lib/roby/execution_engine.rb', line 821

def process_once_blocks
    until once_blocks.empty?
        type, block = once_blocks.pop
        if type == :external_events
            external_events_handlers << block
        else
            propagation_handlers << block
        end
    end
end

#process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
# File 'lib/roby/execution_engine.rb', line 680

def process_pending_application_exceptions(
    application_errors = clear_application_exceptions,
    raise_framework_errors: Roby.app.abort_on_application_exception?
)
    # We don't aggregate exceptions, so report them all and raise one
    if display_exceptions?
        application_errors.each do |error, source|
            unless error.kind_of?(Interrupt)
                fatal "Application error in #{source}"
                Roby.log_exception_with_backtrace(error, self, :fatal)
            end
        end
    end

    error, source = application_errors.find do |error, _|
        raise_framework_errors || error.kind_of?(SignalException)
    end
    if error
        if Roby.app.display_all_threads_state_on_abort?
            fatal "State of all running threads:"
            Roby.log_all_threads_backtraces(self, :fatal)
        end

        raise error, "in #{source}: #{error.message}", error.backtrace
    end
end

#process_waiting_workObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler)



1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
# File 'lib/roby/execution_engine.rb', line 1531

def process_waiting_work
    finished, not_finished = waiting_work.partition(&:complete?)

    unhandled_errors = finished.find_all do |work|
        work.rejected? &&
            (work.respond_to?(:handled_error?) && !work.handled_error?)
    end

    unhandled_errors.each do |work|
        e = work.reason
        e.set_backtrace(e.backtrace)
        add_framework_error(e, work.to_s)

        if work.respond_to?(:error_handling_failure) &&
           (e = work.error_handling_failure)
            e.set_backtrace(e.backtrace)
            add_framework_error(e, work.to_s)
        end
    end

    @waiting_work = not_finished
    finished
end

#promise(description: nil, executor: thread_pool, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a promise to execute the given block in a separate thread

Note that the returned value is a Promise. This means that callbacks added with #on_success or #rescue will be executed in the execution engine thread by default.



2755
2756
2757
# File 'lib/roby/execution_engine.rb', line 2755

def promise(description: nil, executor: thread_pool, &block)
    Promise.new(self, executor: executor, description: description, &block)
end

#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagate an initial set of event propagations and errors



1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
# File 'lib/roby/execution_engine.rb', line 1892

def propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true)
    propagation_info = PropagationInfo.new
    events_errors = initial_errors.dup
    loop do
        log_timepoint_group "event_propagation_phase" do
            events_errors.concat(event_propagation_phase(next_steps, propagation_info))
        end

        next_steps = gather_propagation do
            exception_propagation_errors, error_phase_results = nil
            log_timepoint_group "error_handling_phase" do
                exception_propagation_errors = gather_errors do
                    error_phase_results = error_handling_phase(events_errors)
                end
            end

            add_exceptions_for_inhibition(error_phase_results.each_fatal_error)
            propagation_info.merge(error_phase_results)
            garbage_collection_errors = gather_errors do
                plan.generate_induced_errors(error_phase_results)
                if garbage_collect_pass
                    garbage_collect(error_phase_results.kill_tasks)
                else
                    []
                end
            end
            events_errors = (exception_propagation_errors + garbage_collection_errors)
            log_timepoint "garbage_collect"
        end

        break if next_steps.empty? && events_errors.empty?
    end
    propagation_info
end

#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The core exception propagation algorithm

Yield Parameters:

  • exception (ExecutionException)

    the exception that is being propagated

  • handling_object (Task, Plan)

    the object we want to test whether it handles the exception or not

Yield Returns:

  • (Boolean)

    true if the exception is handled, false otherwise



1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
# File 'lib/roby/execution_engine.rb', line 1259

def propagate_exception_in_plan(exceptions, &handler)
    propagation_graph = dependency_graph.reverse

    # Propagate the exceptions in the hierarchy
    handled_unhandled = []
    exceptions.each do |exception, parents|
        origin = exception.origin
        if parents
            filtered_parents = parents.find_all { |t| t.depends_on?(origin) }
            if filtered_parents != parents
                warn "some parents specified for #{exception.exception}"\
                     "(#{exception.exception.class}) are actually not parents "\
                     "of #{origin}, they got filtered out"
                (parents - filtered_parents).each do |task|
                    warn "  #{task}"
                end

                if filtered_parents.empty?
                    parents = propagation_graph.out_neighbours(origin)
                else
                    parents = filtered_parents
                end
            end
        else
            parents = propagation_graph.out_neighbours(origin)
        end

        debug do
            debug "propagating exception "
            log_pp :debug, exception
            unless parents.empty?
                debug "  constrained to parents"
                log_nest(2) do
                    parents.each do |p|
                        log_pp :debug, p
                    end
                end
            end
            break
        end

        visitor = ExceptionPropagationVisitor.new(
            propagation_graph, exception, origin, parents, handler
        )
        visitor.visit

        unhandled = visitor.unhandled_exceptions.inject { |a, b| a.merge(b) }
        handled   = visitor.handled_exceptions.inject { |a, b| a.merge(b) }
        handled_unhandled << [handled, unhandled]
    end

    exceptions_handled_by = []
    unhandled_exceptions  = []
    handled_unhandled.each do |handled, e|
        if e
            if e.handled = yield(e, plan)
                if handled
                    handled_by = (handled.propagation_leafs.to_set << plan)
                    exceptions_handled_by << [handled.merge(e), handled_by]
                else
                    handled = e
                    exceptions_handled_by << [e, [plan].to_set]
                end
            else
                affected_tasks = e.trace.vertices.to_set
                if handled
                    affected_tasks -= handled.trace.vertices
                    exceptions_handled_by << [handled, handled.propagation_leafs.to_set]
                end
                unhandled_exceptions << [e, affected_tasks]
            end
        else
            exceptions_handled_by << [handled, handled.propagation_leafs.to_set]
        end
    end

    debug do
        debug "#{unhandled_exceptions.size} unhandled exceptions remain"
        log_nest(2) do
            unhandled_exceptions.each do |e, affected_tasks|
                log_pp :debug, e
                debug "Affects #{affected_tasks.size} tasks"
                log_nest(2) do
                    affected_tasks.each do |t|
                        log_pp :debug, t
                    end
                end
            end
        end
        break
    end
    [unhandled_exceptions, exceptions_handled_by]
end

#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions



1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
# File 'lib/roby/execution_engine.rb', line 1360

def propagate_exceptions(exceptions)
    if exceptions.empty?
        return [], [], []
    end

    # Remove all exception that are not associated with a task
    exceptions, free_events_exceptions = exceptions.partition do |e, _|
        e.origin
    end
    # Normalize the free events exceptions
    free_events_exceptions = free_events_exceptions.map do |e, _|
        if e.exception.failed_generator.plan
            [e, Set[e.exception.failed_generator]]
        end
    end.compact

    debug "Filtering inhibited exceptions"
    exceptions = log_nest(2) do
        non_inhibited, = remove_inhibited_exceptions(exceptions)
        # Reset the trace for the real propagation
        non_inhibited.map do |e, _|
            _, propagate_through = exceptions.find { |original_e, _| original_e.exception == e.exception }
            e.reset_trace
            [e, propagate_through]
        end
    end

    debug "Propagating #{exceptions.size} non-inhibited exceptions"
    log_nest(2) do
        # Note that the first half of the method filtered the free
        # events exceptions out of 'exceptions'
        unhandled, handled = propagate_exception_in_plan(exceptions) do |e, object|
            object.handle_exception(e)
        end

        return unhandled, free_events_exceptions, handled
    end
end

#propagation_context(sources) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets the source_event and source_generator variables according to source. source is the from argument of #add_event_propagation



728
729
730
731
732
733
734
735
736
# File 'lib/roby/execution_engine.rb', line 728

def propagation_context(sources)
    current_sources = @propagation_sources
    raise InternalError, "not in a gathering context in #propagation_context" unless in_propagation_context?

    @propagation_sources = sources
    yield
ensure
    @propagation_sources = current_sources
end

#propagation_source_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of events extracted from #sources



517
518
519
520
521
522
523
524
525
# File 'lib/roby/execution_engine.rb', line 517

def propagation_source_events
    result = Set.new
    for ev in @propagation_sources
        if ev.respond_to?(:generator)
            result << ev
        end
    end
    result
end

#propagation_source_generatorsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of generators extracted from #sources



528
529
530
531
532
533
534
535
536
537
538
# File 'lib/roby/execution_engine.rb', line 528

def propagation_source_generators
    result = Set.new
    for ev in @propagation_sources
        result << if ev.respond_to?(:generator)
                      ev.generator
                  else
                      ev
                  end
    end
    result
end

#queue_forward(sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Queue a forwarding to be propagated



748
749
750
# File 'lib/roby/execution_engine.rb', line 748

def queue_forward(sources, target, context, timespec)
    add_event_propagation(true, sources, target, context, timespec)
end

#queue_signal(sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Queue a signal to be propagated



743
744
745
# File 'lib/roby/execution_engine.rb', line 743

def queue_signal(sources, target, context, timespec)
    add_event_propagation(false, sources, target, context, timespec)
end

#quitObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Make control quit properly



2534
2535
2536
# File 'lib/roby/execution_engine.rb', line 2534

def quit
    @quit = 1
end

#quitting?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the control thread is currently quitting



2520
2521
2522
# File 'lib/roby/execution_engine.rb', line 2520

def quitting?
    @quit > 0
end

#refresh_relationsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Refresh the value of cached relations

Some often-used relations are cached at #initialize, such as #dependency_graph and #precedence_graph. Call this when the actual graph objects have changed on the plan



134
135
136
137
138
139
# File 'lib/roby/execution_engine.rb', line 134

def refresh_relations
    @dependency_graph = plan.task_relation_graph_for(TaskStructure::Dependency)
    @precedence_graph = plan.event_relation_graph_for(EventStructure::Precedence)
    @signal_graph     = plan.event_relation_graph_for(EventStructure::Signal)
    @forward_graph    = plan.event_relation_graph_for(EventStructure::Forwarding)
end

#remove_at_cycle_end(handler_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Removes a handler added by #at_cycle_end



2160
2161
2162
# File 'lib/roby/execution_engine.rb', line 2160

def remove_at_cycle_end(handler_id)
    handler_id.dispose
end

#remove_exception_listener(handler) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Removes an exception listener registered with #on_exception



2698
2699
2700
# File 'lib/roby/execution_engine.rb', line 2698

def remove_exception_listener(handler)
    handler.dispose if handler.respond_to?(:dispose)
end

#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process the given exceptions to remove the ones that are currently filtered by the plan repairs

The returned exceptions are propagated, i.e. their #trace method contains all the tasks that are affected by the absence of a handling mechanism



1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
# File 'lib/roby/execution_engine.rb', line 1410

def remove_inhibited_exceptions(exceptions)
    exceptions = exceptions.find_all do |execution_exception, _|
        execution_exception.origin.plan
    end

    propagate_exception_in_plan(exceptions) do |e, object|
        if has_pending_exception_matching?(e, object)
            true
        elsif object.respond_to?(:handles_error?)
            object.handles_error?(e)
        end
    end
end

#remove_periodic_handler(id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Removes a periodic handler defined by #every. id is the value returned by #every.



2184
2185
2186
2187
# File 'lib/roby/execution_engine.rb', line 2184

def remove_periodic_handler(id)
    id.dispose
    nil
end

#resetObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Make a quit EE ready for reuse



2544
2545
2546
# File 'lib/roby/execution_engine.rb', line 2544

def reset
    @quit = 0
end

#reset_thread_poolObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2634
2635
2636
2637
# File 'lib/roby/execution_engine.rb', line 2634

def reset_thread_pool
    @thread_pool&.shutdown
    @thread_pool = Concurrent::CachedThreadPool.new(idletime: 10)
end

#run(cycle: 0.1) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Main event loop. Valid options are

cycle

the cycle duration in seconds (default: 0.1)



2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
# File 'lib/roby/execution_engine.rb', line 2239

def run(cycle: 0.1)
    if running?
        raise AlreadyRunning, "#run has already been called"
    end

    self.running = true

    @allow_propagation = false
    @waiting_work = Concurrent::Array.new

    @thread = Thread.current
    @thread.name = "MAIN"

    @cycle_length = cycle
    trap("INT") do
        interrupt
    end
    event_loop
ensure
    self.running = false
    @thread = nil
    waiting_work.delete_if do |w|
        next(true) if w.complete?

        # rubocop:disable Lint/HandleExceptions
        begin
            w.fail ExecutionQuitError
            Roby.warn "forcefully terminated #{w} on quit"
        rescue Concurrent::MultipleAssignmentError
            # Race condition: something completed the promise while
            # we were trying to make it fail
        end
        # rubocop:enable Lint/HandleExceptions

        true
    end
    finalizers.each do |blk|
        begin
            blk.call
        rescue Exception => e
            Roby.warn "finalizer #{blk} failed"
            Roby.log_exception_with_backtrace(e, Roby, :warn)
        end
    end
    @quit = 0
    @allow_propagation = true
    trap("INT", "DEFAULT")
end

#shutdownObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2629
2630
2631
2632
# File 'lib/roby/execution_engine.rb', line 2629

def shutdown
    killall
    thread_pool.shutdown
end

#start_new_cycle(time = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set the cycle_start attribute and increment cycle_index

This is only used for testing purposes



2510
2511
2512
2513
# File 'lib/roby/execution_engine.rb', line 2510

def start_new_cycle(time = Time.now)
    @cycle_start = time
    @cycle_index += 1
end

#unmark_finished_missions_and_permanent_tasksObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
# File 'lib/roby/execution_engine.rb', line 1947

def unmark_finished_missions_and_permanent_tasks
    to_unmark = plan.task_index.by_predicate[:finished?] |
                plan.task_index.by_predicate[:failed?]

    finished_missions = (plan.mission_tasks & to_unmark)
    # Remove all missions that are finished
    for finished_mission in finished_missions
        unless finished_mission.being_repaired?
            plan.unmark_mission_task(finished_mission)
        end
    end
    finished_permanent = (plan.permanent_tasks & to_unmark)
    for finished_permanent in (plan.permanent_tasks & to_unmark)
        unless finished_permanent.being_repaired?
            plan.unmark_permanent_task(finished_permanent)
        end
    end
end

#unreachable_event(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by EventGenerator when an event became unreachable



574
575
576
# File 'lib/roby/execution_engine.rb', line 574

def unreachable_event(event)
    delayed_events.delete_if { |_, _, _, signalled, _| signalled == event }
end

#wait_one_cycleObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Blocks until at least once execution cycle has been done



2098
2099
2100
2101
2102
2103
2104
2105
# File 'lib/roby/execution_engine.rb', line 2098

def wait_one_cycle
    current_cycle = execute { cycle_index }
    while current_cycle == execute { cycle_index }
        raise ExecutionQuitError unless running?

        sleep(cycle_length)
    end
end

#wait_until(ev) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the current thread until the given even is emitted. If the event becomes unreachable, an UnreachableEvent exception is raised.



2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
# File 'lib/roby/execution_engine.rb', line 2602

def wait_until(ev)
    if inside_control?
        raise ThreadMismatch, "cannot use #wait_until in execution threads"
    end

    ivar = Concurrent::IVar.new
    result = nil
    once(sync: ivar) do
        if ev.unreachable?
            ivar.fail(UnreachableEvent.new(ev, ev.unreachability_reason))
        else
            ev.if_unreachable(cancel_at_emission: true) do |reason, event|
                ivar.fail(UnreachableEvent.new(event, reason)) unless ivar.complete?
            end
            ev.once do |ev|
                ivar.set(result) unless ivar.complete?
            end
            begin
                result = yield if block_given?
            rescue Exception => e
                ivar.fail(e)
            end
        end
    end
    ivar.value!
end