Class: Roby::ExecutionEngine Private

Inherits:
Object
  • Object
show all
Extended by:
Logger::Hierarchy, PropagationHandlerMethods
Includes:
Logger::Hierarchy, DRoby::EventLogging, PropagationHandlerMethods
Defined in:
lib/roby/execution_engine.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The core execution algorithm

It is in charge of handling event and exception propagation, as well as running cleanup processes (e.g. garbage collection).

The main method is #process_events. When executing a Roby application, it is called periodically by #event_loop.

In addition, there is a special “synchronous” propagation mode that is used by Roby::EventGenerator#call and Roby::EventGenerator#emit. This mode is used when the event code is not executed within an engine, but from an imperative script, as in unit tests.

Defined Under Namespace

Modules: PropagationHandlerMethods Classes: AlreadyRunning, ExceptionPropagationVisitor, JoinAllWaitingWorkTimeout, NotPropagationContext, PollBlockDefinition, PropagationInfo, RecursivePropagationContext

Constant Summary collapse

PENDING_PROPAGATION_FORWARD =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

1
PENDING_PROPAGATION_SIGNAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

2
SLEEP_MIN_TIME =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Do not sleep or call Thread#pass if there is less that this much time left in the cycle

0.01
INTERRUPT_FORCE_EXIT_DEAD_ZONE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

How many seconds between two Interrupt before the execution engine’s loop can forcefully quit

10
EXCEPTION_NONFATAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for non-fatal, unhandled exceptions

:nonfatal
EXCEPTION_FATAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for fatal, unhandled exceptions

:fatal
EXCEPTION_HANDLED =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for handled exceptions

:handled
EXCEPTION_FREE_EVENT =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for free event exceptions

:free_event

Instance Attribute Summary collapse

Attributes included from PropagationHandlerMethods

#external_events_handlers, #propagation_handlers

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagationHandlerMethods

add_propagation_handler, at_cycle_begin, create_propagation_handler, each_cycle

Methods included from DRoby::EventLogging

#log, #log_flush_cycle, #log_queue_size, #log_timepoint, #log_timepoint_group, #log_timepoint_group_end, #log_timepoint_group_start

Constructor Details

#initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) ⇒ ExecutionEngine

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create an execution engine acting on plan, using control as the decision control object

Parameters:

  • plan (ExecutablePlan)

    the plan on which this engine acts

  • control (DecisionControl) (defaults to: Roby::DecisionControl.new)

    the policy object, i.e. the object that embeds policies in cases where multiple reactions would be possible

  • event_logger (DRoby::EventLogger) (defaults to: plan.event_logger)

    the logger that should be used to trace execution events. It is by default the same than the #plan‘s. Pass a DRoby::NullEventLogger instance to disable event logging for this engine.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/roby/execution_engine.rb', line 80

def initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger)
    @plan = plan
    @event_logger = event_logger

    @use_oob_gc = ExecutionEngine.use_oob_gc?

    @control = control
    @scheduler = Schedulers::Null.new(plan)
    reset_thread_pool
    @thread = Thread.current

    @propagation = nil
    @propagation_id = 0
    @propagation_exceptions = nil
    @application_exceptions = nil
    @delayed_events = []
    @event_ordering = Array.new
    @event_priorities = Hash.new
    @propagation_handlers = []
    @external_events_handlers = []
    @at_cycle_end_handlers = Array.new
    @process_every   = Array.new
    @waiting_work = Concurrent::Array.new
    @emitted_events  = Array.new
    @disabled_handlers = Set.new
    @exception_listeners = Array.new

    @worker_threads_mtx = Mutex.new
    @worker_threads = Array.new
    @once_blocks = Queue.new

    @pending_exceptions = Hash.new

    each_cycle(&ExecutionEngine.method(:call_every))

    @quit        = 0
    @allow_propagation = true
    @cycle_index = 0
    @cycle_start = Time.now
    @cycle_length = 0.1
    @last_stop_count = 0
    @finalizers = []
    @gc_warning = true

    refresh_relations

    self.display_exceptions = true
end

Instance Attribute Details

#additional_errorsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Used during exception propagation to inject new errors in the process

It shall not be accessed directly. Instead, Plan#add_error should be called



1391
1392
1393
# File 'lib/roby/execution_engine.rb', line 1391

def additional_errors
  @additional_errors
end

#application_exceptionsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of errors which have been generated outside of the plan’s control. For now, those errors cause the whole controller to shut down.



1377
1378
1379
# File 'lib/roby/execution_engine.rb', line 1377

def application_exceptions
  @application_exceptions
end

#at_cycle_end_handlersObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of blocks that are called at each cycle end



2030
2031
2032
# File 'lib/roby/execution_engine.rb', line 2030

def at_cycle_end_handlers
  @at_cycle_end_handlers
end

#controlObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The DecisionControl object associated with this engine



180
181
182
# File 'lib/roby/execution_engine.rb', line 180

def control
  @control
end

#cycle_indexObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The number of this cycle since the beginning



2090
2091
2092
# File 'lib/roby/execution_engine.rb', line 2090

def cycle_index
  @cycle_index
end

#cycle_lengthObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The cycle length in seconds



2084
2085
2086
# File 'lib/roby/execution_engine.rb', line 2084

def cycle_length
  @cycle_length
end

#cycle_startObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The starting Time of this cycle



2087
2088
2089
# File 'lib/roby/execution_engine.rb', line 2087

def cycle_start
  @cycle_start
end

#delayed_eventsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of pending delayed events. This is an array of the form

[[time, is_forward, source, target, context], ...]

See #add_event_delay for more information



485
486
487
# File 'lib/roby/execution_engine.rb', line 485

def delayed_events
  @delayed_events
end

#dependency_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for TaskStructure::Dependency

This is here for performance reasons, to avoid resolving the same graph over and over



169
170
171
# File 'lib/roby/execution_engine.rb', line 169

def dependency_graph
  @dependency_graph
end

#disabled_handlersArray<PollBlockDefinition> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Poll blocks that have been disabled because they raised an exception

Returns:



360
361
362
# File 'lib/roby/execution_engine.rb', line 360

def disabled_handlers
  @disabled_handlers
end

#emitted_eventsArray<Event> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop)

Returns:



187
188
189
# File 'lib/roby/execution_engine.rb', line 187

def emitted_events
  @emitted_events
end

#event_loggerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The underlying DRoby::EventLogger

It is usually the same than the #plan‘s. Pass a DRoby::NullEventLogger at construction time to disable logging of execution events.



178
179
180
# File 'lib/roby/execution_engine.rb', line 178

def event_logger
  @event_logger
end

#event_orderingObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The topological ordering of events w.r.t. the Precedence relation. This gets updated on-demand when the event relations change.



941
942
943
# File 'lib/roby/execution_engine.rb', line 941

def event_ordering
  @event_ordering
end

#event_prioritiesObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The event => index hash which give the propagation priority for each event



944
945
946
# File 'lib/roby/execution_engine.rb', line 944

def event_priorities
  @event_priorities
end

#exception_listenersArray<#call> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The blocks that are currently listening to exceptions

Returns:

  • (Array<#call>)


190
191
192
# File 'lib/roby/execution_engine.rb', line 190

def exception_listeners
  @exception_listeners
end

#finalizersObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of proc objects which are to be called when the execution engine quits.



2379
2380
2381
# File 'lib/roby/execution_engine.rb', line 2379

def finalizers
  @finalizers
end

#forward_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for EventStructure::Forward

This is here for performance reasons, to avoid resolving the same graph over and over



163
164
165
# File 'lib/roby/execution_engine.rb', line 163

def forward_graph
  @forward_graph
end

#last_stop_countObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

:nodoc:



2172
2173
2174
# File 'lib/roby/execution_engine.rb', line 2172

def last_stop_count
  @last_stop_count
end

#once_blocksQueue (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Thread-safe queue to push work to the execution engine

Do not access directly, use #once instead

Returns:

  • (Queue)

    blocks that should be executed at the beginning of the next execution cycle. It is the only thread safe way to queue work to be executed by the engine



198
199
200
# File 'lib/roby/execution_engine.rb', line 198

def once_blocks
  @once_blocks
end

#planObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Plan this engine is acting on



172
173
174
# File 'lib/roby/execution_engine.rb', line 172

def plan
  @plan
end

#precedence_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for EventStructure::Precedence

This is here for performance reasons, to avoid resolving the same graph over and over



151
152
153
# File 'lib/roby/execution_engine.rb', line 151

def precedence_graph
  @precedence_graph
end

#process_everyObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of blocks which are called every cycle



2052
2053
2054
# File 'lib/roby/execution_engine.rb', line 2052

def process_every
  @process_every
end

#propagation_idObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A numeric ID giving the count of the current propagation cycle



182
183
184
# File 'lib/roby/execution_engine.rb', line 182

def propagation_id
  @propagation_id
end

#propagation_sourcesObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of source events for the current propagation action. This is a mix of EventGenerator and Event objects.



455
456
457
# File 'lib/roby/execution_engine.rb', line 455

def propagation_sources
  @propagation_sources
end

#schedulerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The scheduler is the object which handles non-generic parts of the propagation cycle. For now, its #initial_events method is called at the beginning of each propagation cycle and can call or emit a set of events.

See Schedulers::Basic



431
432
433
# File 'lib/roby/execution_engine.rb', line 431

def scheduler
  @scheduler
end

#signal_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for EventStructure::Signal

This is here for performance reasons, to avoid resolving the same graph over and over



157
158
159
# File 'lib/roby/execution_engine.rb', line 157

def signal_graph
  @signal_graph
end

#threadObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The execution thread if there is one running



2079
2080
2081
# File 'lib/roby/execution_engine.rb', line 2079

def thread
  @thread
end

#thread_poolConcurrent::CachedThreadPool (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A thread pool on which async work should be executed

Returns:

  • (Concurrent::CachedThreadPool)

See Also:

  • Roby::ExecutionEngine.{{#promise}


145
146
147
# File 'lib/roby/execution_engine.rb', line 145

def thread_pool
  @thread_pool
end

#waiting_workArray<#fail,#complete?> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A list of threaded objects waiting for the control thread

Objects registered here will be notified them by calling #fail when it quits. In addition, #join_all_waiting_work will wait for all pending jobs to finish.

Note that all Concurrent::Obligation subclasses fit the bill

Returns:

  • (Array<#fail,#complete?>)


2027
2028
2029
# File 'lib/roby/execution_engine.rb', line 2027

def waiting_work
  @waiting_work
end

Class Method Details

.call_every(plan) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Calls the periodic blocks which should be called



2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
# File 'lib/roby/execution_engine.rb', line 2000

def self.call_every(plan) # :nodoc:
    engine = plan.execution_engine
    now        = engine.cycle_start
    length     = engine.cycle_length
    engine.process_every.map! do |block, last_call, duration|
        # Check if the nearest timepoint is the beginning of
        # this cycle or of the next cycle
        if !last_call || (duration - (now - last_call)) < length / 2
            if !block.call(engine, engine.plan)
                next
            end

            last_call = now
        end
        [block, last_call, duration]
    end.compact!
end

.make_delay(timeref, source, target, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a Time object which represents the absolute point in time referenced by timespec in the context of delaying a propagation between source and target.

See validate_timespec for more information



931
932
933
934
935
936
937
# File 'lib/roby/execution_engine.rb', line 931

def self.make_delay(timeref, source, target, timespec)
    if delay = timespec[:delay] then timeref + delay
    elsif at = timespec[:at] then at
    else
        raise ArgumentError, "invalid timespec #{timespec}"
    end
end

.validate_timespec(timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validates timespec as a delay specification. A valid delay specification is either nil or a hash, in which case two forms are possible:

at: absolute_time
delay: number


920
921
922
923
924
# File 'lib/roby/execution_engine.rb', line 920

def self.validate_timespec(timespec)
    if timespec
        timespec = validate_options timespec, [:delay, :at]
    end
end

Instance Method Details

#add_error(e, propagate_through: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Register a LocalizedError for future propagation

This method must be called in a error-gathering context (i.e. #gather_error.

Parameters:

  • e (#to_execution_exception)

    the exception

Raises:



582
583
584
585
586
587
588
589
590
# File 'lib/roby/execution_engine.rb', line 582

def add_error(e, propagate_through: nil)
    plan_exception = e.to_execution_exception
    if @propagation_exceptions
        @propagation_exceptions << [plan_exception, propagate_through]
    else
        Roby.log_exception_with_backtrace(e, self, :fatal)
        raise NotPropagationContext, "#add_error called outside an error-gathering context (#add_error)"
    end
end

#add_event_delay(time, is_forward, source, target, context) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a propagation step to be performed when the current time is greater than time. The propagation step is a signal if is_forward is false and a forward otherwise.

This method should not be called directly. Use #add_event_propagation with the appropriate timespec argument.

See also #delayed_events and #execute_delayed_events



495
496
497
# File 'lib/roby/execution_engine.rb', line 495

def add_event_delay(time, is_forward, source, target, context)
    delayed_events << [time, is_forward, source, target, context]
end

#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a propagation to the next propagation step: it registers a propagation step to be performed between source and target with the given context. If is_forward is true, the propagation will be a forwarding, otherwise it is a signal.

If timespec is not nil, it defines a delay to be applied before calling the target event.

See #gather_propagation



700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
# File 'lib/roby/execution_engine.rb', line 700

def add_event_propagation(is_forward, sources, target, context, timespec)
    if target.plan != plan
        raise Roby::EventNotExecutable.new(target), "#{target} not in executed plan"
    end

    target.pending(sources.find_all { |ev| ev.kind_of?(Event) })

    @propagation_step_id += 1
    target_info = (@propagation[target] ||= [@propagation_step_id, [], []])
    step = target_info[is_forward ? PENDING_PROPAGATION_FORWARD : PENDING_PROPAGATION_SIGNAL]
    if sources.empty?
        step << nil << context << timespec
    else
        sources.each do |ev|
            step << ev << context << timespec
        end
    end
end

#add_exceptions_for_inhibition(fatal_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles



1827
1828
1829
1830
1831
1832
1833
1834
# File 'lib/roby/execution_engine.rb', line 1827

def add_exceptions_for_inhibition(fatal_errors)
    fatal_errors.each do |exception, involved_tasks|
        involved_tasks.each do |t|
            (@pending_exceptions[t] ||= Set.new) <<
                [exception.exception.class, exception.origin]
        end
    end
end

#add_framework_error(error, source) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers the given error and a description of its source in the list of application/framework errors

It must be called within an exception-gathering context, that is either within #process_events, or within #gather_framework_errors

These errors will terminate the event loop

Parameters:



653
654
655
656
657
658
659
660
# File 'lib/roby/execution_engine.rb', line 653

def add_framework_error(error, source)
    if @application_exceptions
        @application_exceptions << [error, source]
    else
        Roby.log_exception_with_backtrace(error, self, :fatal)
        raise NotPropagationContext, "#add_framework_error called outside an exception-gathering context"
    end
end

#at_cycle_end(description: 'at_cycle_end', **options) {|plan| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a block to be called at the end of each execution cycle

Yield Parameters:

  • plan (Plan)

    the plan on which this engine runs

Returns:



2038
2039
2040
2041
2042
# File 'lib/roby/execution_engine.rb', line 2038

def at_cycle_end(description: 'at_cycle_end', **options, &block)
    handler = PollBlockDefinition.new(description, block, **options)
    at_cycle_end_handlers << handler
    handler.object_id
end

#call_poll_blocks(blocks, late = false) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Helper that calls the propagation handlers in propagation_handlers (which are expected to be instances of PollBlockDefinition) and handles the errors according of each handler’s policy



740
741
742
743
744
745
746
747
748
749
750
751
752
753
# File 'lib/roby/execution_engine.rb', line 740

def call_poll_blocks(blocks, late = false)
    blocks.delete_if do |handler|
        if handler.disabled? || (handler.late? ^ late)
            next
        end

        log_timepoint_group handler.description do
            if !handler.call(self, plan)
                handler.disabled = true
            end
        end
        handler.once?
    end
end

#call_propagation_handlersObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
# File 'lib/roby/execution_engine.rb', line 776

def call_propagation_handlers
    process_once_blocks
    if scheduler.enabled?
        gather_framework_errors('scheduler') do
            scheduler.initial_events
            log_timepoint 'scheduler'
        end
    end
    call_poll_blocks(self.class.propagation_handlers, false)
    call_poll_blocks(self.propagation_handlers, false)

    if !has_queued_events?
        call_poll_blocks(self.class.propagation_handlers, true)
        call_poll_blocks(self.propagation_handlers, true)
    end
end

#clearObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.

Returns nil if the plan is cleared, and the set of remaining tasks otherwise. Note that quaranteened tasks are not counted as remaining, as it is not possible for the execution engine to stop them.



2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
# File 'lib/roby/execution_engine.rb', line 2180

def clear
    plan.mission_tasks.dup.each { |t| plan.unmark_mission_task(t) }
    plan.permanent_tasks.dup.each { |t| plan.unmark_permanent_task(t) }
    plan.permanent_events.dup.each { |t| plan.unmark_permanent_event(t) }
    plan.force_gc.merge( plan.tasks )

    quaranteened_subplan = plan.compute_useful_tasks(plan.quarantined_tasks)
    remaining = plan.tasks - quaranteened_subplan

    @pending_exceptions.clear

    if remaining.empty?
        # Have to call #garbage_collect one more to make
        # sure that unneeded events are removed as well
        garbage_collect
        # Done cleaning the tasks, clear the remains
        plan.transactions.each do |trsc|
            trsc.discard_transaction if trsc.self_owned?
        end
        plan.clear
        emitted_events.clear
        return
    end
    remaining
end

#clear_application_exceptionsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



1378
1379
1380
1381
1382
1383
1384
1385
# File 'lib/roby/execution_engine.rb', line 1378

def clear_application_exceptions
    if !@application_exceptions
        raise RecursivePropagationContext, "unbalanced call to #clear_application_exceptions"
    end

    result, @application_exceptions = @application_exceptions, nil
    result
end

#compute_errors(events_errors) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute the set of fatal errors in the current execution state

Parameters:

  • events_errors (Array)

    the set of errors gathered during event propagation

Returns:



1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
# File 'lib/roby/execution_engine.rb', line 1398

def compute_errors(events_errors)
    # Generate exceptions from task structure
    structure_errors = plan.check_structure
    log_timepoint 'structure_check'

    # Propagate the errors. Note that the plan repairs are taken into
    # account in ExecutionEngine.propagate_exceptions directly.  We keep
    # event and structure errors separate since in the first case there
    # is not two-stage handling (all errors that have not been handled
    # are fatal), and in the second case we call #check_structure
    # again to errors that are remaining after the call to the exception
    # handlers
    events_errors, free_events_errors, events_handled = propagate_exceptions(events_errors)
    _, structure_handled = propagate_exceptions(structure_errors)
    log_timepoint 'exception_propagation'

    # Get the remaining problems in the plan structure, and act on it
    structure_errors, structure_inhibited = remove_inhibited_exceptions(plan.check_structure)

    # Partition them by fatal/nonfatal
    fatal_errors, nonfatal_errors = Array.new, Array.new
    (structure_errors + events_errors).each do |e, involved_tasks|
        if e.fatal?
            fatal_errors << [e, involved_tasks]
        else
            nonfatal_errors << [e, involved_tasks]
        end
    end
    kill_tasks = compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors).to_set
    handled_errors = structure_handled + events_handled

    debug "#{fatal_errors.size} fatal errors found and #{free_events_errors.size} errors involving free events"
    debug "the fatal errors involve #{kill_tasks.size} non-finalized tasks"
    return PropagationInfo.new(Set.new, Set.new, kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors, structure_inhibited)
end

#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute the set of unhandled fatal exceptions



856
857
858
859
860
861
862
863
# File 'lib/roby/execution_engine.rb', line 856

def compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors)
    kill_tasks = fatal_errors.inject(Set.new) do |tasks, (exception, affected_tasks)|
        tasks.merge(affected_tasks)
    end
    # Tasks might have been finalized during exception handling, filter
    # those out
    kill_tasks.find_all(&:plan)
end

#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called at each cycle end



2396
2397
2398
2399
2400
# File 'lib/roby/execution_engine.rb', line 2396

def cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?)
    gather_framework_errors("#cycle_end", raise_caught_exceptions: raise_framework_errors) do
        call_poll_blocks(at_cycle_end_handlers)
    end
end

#delayed(delay, description: 'delayed block', **options, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules block to be called once after delay seconds passed, in the propagation context



1366
1367
1368
1369
1370
1371
1372
# File 'lib/roby/execution_engine.rb', line 1366

def delayed(delay, description: 'delayed block', **options, &block)
    handler = PollBlockDefinition.new(description, block, once: true, **options)
    once do
        process_every << [handler, cycle_start, delay]
    end
    handler.id
end

#display_exceptions=(flag) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Controls whether this engine should indiscriminately display all fatal exceptions

This is on by default



2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
# File 'lib/roby/execution_engine.rb', line 2549

def display_exceptions=(flag)
    if flag
        @exception_display_handler ||= on_exception do |kind, error, tasks|
            level = if kind == EXCEPTION_HANDLED then :debug
                    else :warn
                    end

            send(level) do
                send(level, "encountered a #{kind} exception")
                Roby.log_exception_with_backtrace(error.exception, self, level)
                if kind == EXCEPTION_HANDLED
                    send(level, "the exception was handled by")
                else
                    send(level, "the exception involved")
                end
                tasks.each do |t|
                    send(level, "  #{t}")
                end
                break
            end
        end
    else
        remove_exception_listener(@exception_display_handler)
        @exception_display_handler = nil
    end
end

#display_exceptions?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

whether this engine should indiscriminately display all fatal exceptions

Returns:

  • (Boolean)


2578
2579
2580
# File 'lib/roby/execution_engine.rb', line 2578

def display_exceptions?
    !!@exception_display_handler
end

#error_handling_phase(events_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute errors in plan and handle the results



841
842
843
844
845
846
847
848
849
850
851
852
853
# File 'lib/roby/execution_engine.rb', line 841

def error_handling_phase(events_errors)
    # Do the exception handling phase
    errors = compute_errors(events_errors)
    notify_about_error_handling_results(errors)

    # nonfatal errors are only notified. Fatal errors (kill_tasks) are
    # handled in the propagation loop during garbage collection. Only
    # the free events errors have to be handled here.
    errors.free_events_errors.each do |exception, generators|
        generators.each { |g| g.unreachable!(exception.exception) }
    end
    return errors
end

#event_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The main event loop. It returns when the execution engine is asked to quit. In general, this does not need to be called direclty: use #run to start the event loop in a separate thread.



2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
# File 'lib/roby/execution_engine.rb', line 2227

def event_loop
    last_stop_count = 0
    last_quit_warning = Time.now
    @cycle_start  = Time.now
    @cycle_index  = 0

    force_exit_deadline = nil
    last_process_times = Process.times
    last_dump_time = plan.event_logger.dump_time

    loop do
        begin
            if profile_gc?
                GC::Profiler.enable
            end

            if quitting?
                if forced_exit?
                    return
                end

                begin
                    remaining = clear
                    return if !remaining

                    if (last_stop_count != remaining.size) || (Time.now - last_quit_warning) > 10
                        if last_stop_count == 0
                            info "Roby quitting ..."
                        end

                        issue_quit_progression_warning(remaining)
                        last_quit_warning = Time.now
                        last_stop_count = remaining.size
                    end
                rescue Exception => e
                    warn "Execution thread failed to clean up"
                    Roby.log_exception_with_backtrace(e, self, :warn, filter: false)
                    return
                end
            end

            log_timepoint_group_start "cycle"

            while Time.now > cycle_start + cycle_length
                @cycle_start += cycle_length
                @cycle_index += 1
            end
            stats = Hash.new
            stats[:start] = [cycle_start.tv_sec, cycle_start.tv_usec]
            stats[:actual_start] = Time.now - cycle_start
            stats[:cycle_index] = cycle_index

            log_timepoint_group 'process_events' do
                process_events
            end

            remaining_cycle_time = cycle_length - (Time.now - cycle_start)

            if use_oob_gc?
                stats[:pre_oob_gc] = GC.stat
                GC::OOB.run
            end
            
            # Sleep if there is enough time for it
            if remaining_cycle_time > SLEEP_MIN_TIME
                sleep(remaining_cycle_time) 
            end
            log_timepoint 'sleep'

            cycle_end(stats)

            # Log cycle statistics
            process_times = Process.times
            dump_time = plan.event_logger.dump_time
            stats[:log_queue_size]   = plan.log_queue_size
            stats[:plan_task_count]  = plan.num_tasks
            stats[:plan_event_count] = plan.num_free_events
            stats[:gc] = GC.stat
            stats[:utime] = process_times.utime - last_process_times.utime
            stats[:stime] = process_times.stime - last_process_times.stime
            stats[:dump_time] = dump_time - last_dump_time
            stats[:state] = Roby::State
            stats[:end] = Time.now - cycle_start
            if profile_gc?
                stats[:gc_profile_data] = GC::Profiler.raw_data
                stats[:gc_total_time] = GC::Profiler.total_time
            else
                stats[:gc_profile_data] = nil
                stats[:gc_total_time] = 0
            end
            log_flush_cycle :cycle_end, stats

            last_dump_time = dump_time
            last_process_times = process_times
            stats = Hash.new

            @cycle_start += cycle_length
            @cycle_index += 1

            if profile_gc?
                GC::Profiler.disable
            end

        rescue Exception => e
            if e.kind_of?(Interrupt)
                if quitting?
                    if force_exit_deadline && (force_exit_deadline - Time.now) < 0
                        fatal "Quitting without cleaning up"
                        force_quit
                    else
                        fatal "Still #{Integer(force_exit_deadline - Time.now)}s before interruption will quit without cleaning up"
                    end
                else
                    fatal "Received interruption request"
                    fatal "Interrupt again in #{INTERRUPT_FORCE_EXIT_DEAD_ZONE}s to quit without cleaning up"
                    quit
                    force_exit_deadline = Time.now + INTERRUPT_FORCE_EXIT_DEAD_ZONE
                end
            elsif !quitting?
                quit

                fatal "Execution thread quitting because of unhandled exception"
                Roby.log_exception_with_backtrace(e, self, :fatal)
            else
                fatal "Execution thread FORCEFULLY quitting because of unhandled exception"
                Roby.log_exception_with_backtrace(e, self, :fatal)
                raise
            end
        ensure
            log_timepoint_group_end "cycle"
        end
    end

ensure
    if !plan.tasks.empty?
        warn "the following tasks are still present in the plan:"
        plan.tasks.each do |t|
            warn "  #{t}"
        end
    end
end

#event_propagation_phase(initial_events, propagation_info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block

If a block is given, it is called with the initial set of events: the events we should consider as already emitted in the following propagation. seeds si a list of procs which should be called to initiate the propagation (i.e. build an initial set of events)



826
827
828
829
830
831
832
833
834
835
836
837
838
# File 'lib/roby/execution_engine.rb', line 826

def event_propagation_phase(initial_events, propagation_info)
    @propagation_id += 1

    gather_errors do
        next_steps = initial_events
        while !next_steps.empty?
            while !next_steps.empty?
                next_steps = event_propagation_step(next_steps, propagation_info)
            end        
            next_steps = gather_propagation { call_propagation_handlers }
        end
    end
end

#event_propagation_step(current_step, propagation_info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagate one step

current_step describes all pending emissions and calls.

This method calls ExecutionEngine.next_event to get the description of the next event to call. If there are signals going to this event, they are processed and the forwardings will be treated in the next step.

The method returns the next set of pending emissions and calls, adding the forwardings and signals that the propagation of the considered event have added.



1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
# File 'lib/roby/execution_engine.rb', line 1053

def event_propagation_step(current_step, propagation_info)
    signalled, step_id, forward_info, call_info = next_event(current_step)

    next_step = nil
    if !call_info.empty?
        source_events, source_generators, context =
            prepare_propagation(signalled, false, call_info)
        if source_events
            log(:generator_propagate_events, false, source_events, signalled)

            if signalled.self_owned?
                next_step = gather_propagation(current_step) do
                    propagation_context(source_events | source_generators) do
                        begin
                            propagation_info.add_generator_call(signalled)
                            signalled.call_without_propagation(context) 
                        rescue Roby::LocalizedError => e
                            if signalled.command_emitted?
                                add_error(e)
                            else
                                signalled.emit_failed(e)
                            end
                        rescue Exception => e
                            if signalled.command_emitted?
                                add_error(Roby::CommandFailed.new(e, signalled))
                            else
                                signalled.emit_failed(Roby::CommandFailed.new(e, signalled))
                            end
                        end
                    end
                end
            end
        end

        if forward_info
            next_step ||= Hash.new
            target_info = (next_step[signalled] ||= [@propagation_step_id += 1, [], []])
            target_info[PENDING_PROPAGATION_FORWARD].concat(forward_info)
        end

    elsif !forward_info.empty?
        source_events, source_generators, context =
            prepare_propagation(signalled, true, forward_info)
        if source_events
            log(:generator_propagate_events, true, source_events, signalled)

            # If the destination event is not owned, but if the peer is not
            # connected, the event is our responsibility now.
            if signalled.self_owned? || !signalled.owners.any? { |peer| peer != plan.local_owner && peer.connected? }
                next_step = gather_propagation(current_step) do
                    propagation_context(source_events | source_generators) do
                        begin
                            if event = signalled.emit_without_propagation(context)
                                propagation_info.add_event_emission(event)
                                emitted_events << event
                            end
                        rescue Roby::LocalizedError => e
                            Roby.warn "Internal Error: #emit_without_propagation emitted a LocalizedError exception. This is unsupported and will become a fatal error in the future. You should usually replace raise with engine.add_error"
                            Roby.display_exception(Roby.logger.io(:warn), e, false)
                            add_error(e)
                        rescue Exception => e
                            Roby.warn "Internal Error: #emit_without_propagation emitted an exception. This is unsupported and will become a fatal error in the future. You should create a proper localized error and replace raise with engine.add_error"
                            Roby.display_exception(Roby.logger.io(:warn), e, false)
                            add_error(Roby::EmissionFailed.new(e, signalled))
                        end
                    end
                end
            end
        end
    end

    current_step.merge!(next_step) if next_step
    current_step
end

#every(duration, description: 'periodic handler', **options, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call block every duration seconds. Note that duration is round up to the cycle size (time between calls is *at least* duration)

The returned value is the periodic handler ID. It can be passed to #remove_periodic_handler to undefine it.



2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
# File 'lib/roby/execution_engine.rb', line 2059

def every(duration, description: 'periodic handler', **options, &block)
    handler = PollBlockDefinition.new(description, block, **options)

    once do
        if handler.call(self, plan)
            process_every << [handler, cycle_start, duration]
        end
    end
    handler.id
end

#execute(catch: [], type: :external_events) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context. If the block raises, the exception is raised back in the calling thread.



2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
# File 'lib/roby/execution_engine.rb', line 2405

def execute(catch: [], type: :external_events)
    if inside_control?
        return yield
    end

    capture_catch = lambda do |symbol, *other|
        caught = catch(symbol) do
            if other.empty?
                return [:ret, yield]
            else
                return capture_catch(block, *other)
            end
        end
        [:throw, [symbol, caught]]
    end

    ivar = Concurrent::IVar.new
    once(sync: ivar, type: type) do
        begin
            if !catch.empty?
                result = capture_catch.call(*catch) { yield }
                ivar.set(result)
            else
                ivar.set([:ret, yield])
            end
        rescue ::Exception => e # rubocop:disable Lint/RescueException
            ivar.set([:raise, e])
        end
    end

    mode, value = ivar.value!
    case mode
    when :ret
        return value
    when :throw
        throw *value
    else
        raise value
    end
end

#execute_delayed_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds the events in delayed_events whose time has passed into the propagation. This must be called in propagation context.

See #add_event_delay and #delayed_events



503
504
505
506
507
508
509
510
511
# File 'lib/roby/execution_engine.rb', line 503

def execute_delayed_events
    reftime = Time.now
    delayed_events.delete_if do |time, forward, source, signalled, context|
        if time <= reftime
            add_event_propagation(forward, [source], signalled, context, nil)
            true
        end
    end
end

#finalized_event(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #plan when an event has been finalized



524
525
526
527
528
529
530
# File 'lib/roby/execution_engine.rb', line 524

def finalized_event(event)
    if @propagation
        @propagation.delete(event)
    end
    event.unreachable!("finalized", plan)
    # since the event is already finalized, 
end

#finalized_task(task) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #plan when a task has been finalized



519
520
521
# File 'lib/roby/execution_engine.rb', line 519

def finalized_task(task)
    @pending_exceptions.delete(task)
end

#force_quitObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Force quitting, without cleaning up



2388
# File 'lib/roby/execution_engine.rb', line 2388

def force_quit; @quit = 2 end

#forced_exit?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the control thread is currently quitting

Returns:

  • (Boolean)


2384
# File 'lib/roby/execution_engine.rb', line 2384

def forced_exit?; @quit > 1 end

#garbage_collect(force_on = nil) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Kills and removes all unneeded tasks. force_on is a set of task whose garbage-collection must be performed, even though those tasks are actually useful for the system. This is used to properly kill tasks for which errors have been detected.

Returns:

  • (Boolean)

    true if events have been called (thus requiring some propagation) and false otherwise



1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
# File 'lib/roby/execution_engine.rb', line 1861

def garbage_collect(force_on = nil)
    if force_on && !force_on.empty?
        info "GC: adding #{force_on.size} tasks in the force_gc set"
        mismatching_plan = force_on.find_all do |t|
            if t.plan == self.plan
                plan.force_gc << t
                false
            else
                true
            end
        end
        if !mismatching_plan.empty?
            raise ArgumentError, "#{mismatching_plan.map { |t| "#{t}(plan=#{t.plan})" }.join(", ")} have been given to #{self}.garbage_collect, but they are not tasks in #{plan}"
        end
    end

    unmark_finished_missions_and_permanent_tasks

    # The set of tasks for which we queued stop! at this cycle
    # #finishing? is false until the next event propagation cycle
    finishing = Set.new
    did_something = true
    while did_something
        did_something = false

        tasks = plan.unneeded_tasks | plan.force_gc
        local_tasks  = plan.local_tasks & tasks
        remote_tasks = tasks - local_tasks

        # Remote tasks are simply removed, regardless of other concerns
        for t in remote_tasks
            debug { "GC: removing the remote task #{t}" }
            plan.garbage_task(t)
        end

        break if local_tasks.empty?

        debug do
            debug "#{local_tasks.size} tasks are unneeded in this plan"
            local_tasks.each do |t|
                debug "  #{t} mission=#{plan.mission_task?(t)} permanent=#{plan.permanent_task?(t)}"
            end
            break
        end

        if local_tasks.all? { |t| t.pending? || t.finished? }
            local_tasks.each do |t|
                debug { "GC: #{t} is not running, removed" }
                if plan.garbage_task(t)
                    did_something = true
                end
            end
            break
        end

        # Mark all root local_tasks as garbage.
        roots = local_tasks.dup
        plan.each_task_relation_graph do |g|
            next if !g.root_relation? || g.weak?
            roots.delete_if do |t|
                g.each_in_neighbour(t).any? { |p| !p.finished? }
            end
            break if roots.empty?
        end

        (roots.to_set - finishing).each do |local_task|
            if local_task.pending?
                info "GC: removing pending task #{local_task}"

                if plan.garbage_task(local_task)
                    did_something = true
                end
            elsif local_task.failed_to_start?
                info "GC: removing task that failed to start #{local_task}"
                if plan.garbage_task(local_task)
                    did_something = true
                end
            elsif local_task.starting?
                # wait for task to be started before killing it
                debug { "GC: #{local_task} is starting" }
            elsif local_task.finished?
                debug { "GC: #{local_task} is not running, removed" }
                if plan.garbage_task(local_task)
                    did_something = true
                end
            elsif !local_task.finishing?
                if local_task.quarantined?
                    warn "GC: #{local_task} is running but in quarantine"
                elsif local_task.event(:stop).controlable?
                    debug { "GC: attempting to stop #{local_task}" }
                    if !local_task.respond_to?(:stop!)
                        warn "something fishy: #{local_task}/stop is controlable but there is no #stop! method, putting in quarantine"
                        plan.quarantine_task(local_task)
                    else
                        finishing << local_task
                    end
                else
                    warn "GC: #{local_task} cannot be stopped, putting in quarantine"
                    plan.quarantine_task(local_task)
                end
            elsif local_task.finishing?
                debug do
                    debug "GC: waiting for #{local_task} to finish"
                    local_task.history.each do |ev|
                        debug "GC:   #{ev}"
                    end
                    break
                end
            else
                warn "GC: ignored #{local_task}"
            end
        end
    end

    finishing.each do |task|
        task.stop!
    end

    plan.unneeded_events.each do |event|
        plan.garbage_event(event)
    end

    !finishing.empty?
end

#gather_errorsArray<ExecutionException>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes the given block while gathering errors, and returns the errors that have been declared with #add_error

Returns:



801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
# File 'lib/roby/execution_engine.rb', line 801

def gather_errors
    if @propagation_exceptions
        raise InternalError, "recursive call to #gather_errors"
    end

    # The ensure clause must NOT apply to the recursive check above.
    # Otherwise, we end up resetting @propagation_exceptions to nil,
    # which wreaks havoc
    begin
        @propagation_exceptions = []
        yield
        @propagation_exceptions

    ensure
        @propagation_exceptions = nil
    end
end

#gather_external_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Gather the events that come out of this plan manager



769
770
771
772
773
774
# File 'lib/roby/execution_engine.rb', line 769

def gather_external_events
    process_once_blocks
    gather_framework_errors('delayed events')     { execute_delayed_events }
    call_poll_blocks(self.class.external_events_handlers)
    call_poll_blocks(self.external_events_handlers)
end

#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Yields to the block and registers any raised exception using #add_framework_error

If the method is called within an exception-gathering context (either #process_events or #gather_framework_errors itself), nothing else is done. Otherwise, #process_pending_application_exceptions is called to re-raise any caught exception



599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
# File 'lib/roby/execution_engine.rb', line 599

def gather_framework_errors(source, raise_caught_exceptions: true)
    if @application_exceptions
        recursive_error_gathering_context = true
    else
        @application_exceptions = []
    end

    yield

    if !recursive_error_gathering_context && !raise_caught_exceptions
        clear_application_exceptions
    end
rescue Exception => e
    add_framework_error(e, source)
    if !recursive_error_gathering_context && !raise_caught_exceptions
        clear_application_exceptions
    end
ensure
    if !recursive_error_gathering_context && raise_caught_exceptions
        process_pending_application_exceptions
    end
end

#gather_propagation(initial_set = Hash.new) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets up a propagation context, yielding the block in it. During this propagation stage, all calls to #emit and #call are stored in an internal hash of the form:

target => [forward_sources, signal_sources]

where the two _sources are arrays of the form

[[source, context], ...]

The method returns the resulting hash. Use #in_propagation_context? to know if the current engine is in a propagation context, and #add_event_propagation to add a new entry to this set.

Raises:



548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'lib/roby/execution_engine.rb', line 548

def gather_propagation(initial_set = Hash.new)
    raise InternalError, "nested call to #gather_propagation" if in_propagation_context?

    old_allow_propagation, @allow_propagation = @allow_propagation, true

    # The ensure clause must NOT apply to the recursive check above.
    # Otherwise, we end up resetting @propagation_exceptions to nil,
    # which wreaks havoc
    begin
        @propagation = initial_set
        @propagation_sources = nil
        @propagation_step_id = 0

        before = @propagation
        propagation_context([]) do
            yield
        end

        result, @propagation = @propagation, nil
        return result
    ensure
        @propagation = nil
        @allow_propagation = old_allow_propagation
    end
end

#gathering?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


440
441
442
443
# File 'lib/roby/execution_engine.rb', line 440

def gathering?
    Roby.warn_deprecated "#gathering? is deprecated, use #in_propagation_context? instead"
    in_propagation_context?
end

#gathering_errors?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


793
794
795
# File 'lib/roby/execution_engine.rb', line 793

def gathering_errors?
    !!@propagation_exceptions
end

#has_pending_exception_matching?(e, object) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object

Parameters:

Returns:

  • (Boolean)


1821
1822
1823
# File 'lib/roby/execution_engine.rb', line 1821

def has_pending_exception_matching?(e, object)
    @pending_exceptions[object] && @pending_exceptions[object].include?([e.exception.class, e.origin])
end

#has_pending_forward?(from, to, expected_context) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether a forward matching this signature is currently pending

Returns:

  • (Boolean)


720
721
722
723
724
725
726
# File 'lib/roby/execution_engine.rb', line 720

def has_pending_forward?(from, to, expected_context)
    if pending = @propagation[to]
        pending[PENDING_PROPAGATION_FORWARD].each_slice(3).any? do |event, context, timespec|
            (from === event.generator) && (expected_context === context)
        end
    end
end

#has_pending_signal?(from, to, expected_context) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether a signal matching this signature is currently pending

Returns:

  • (Boolean)


729
730
731
732
733
734
735
# File 'lib/roby/execution_engine.rb', line 729

def has_pending_signal?(from, to, expected_context)
    if pending = @propagation[to]
        pending[PENDING_PROPAGATION_SIGNAL].each_slice(3).any? do |event, context, timespec|
            (from === event.generator) && (expected_context === context)
        end
    end
end

#has_propagation_for?(target) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


674
675
676
# File 'lib/roby/execution_engine.rb', line 674

def has_propagation_for?(target)
    @propagation && @propagation.has_key?(target)
end

#has_queued_events?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true if some events are queued

Returns:

  • (Boolean)


533
534
535
# File 'lib/roby/execution_engine.rb', line 533

def has_queued_events?
    !@propagation.empty?
end

#has_waiting_work?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether this EE has asynchronous waiting work waiting to be processed

Returns:

  • (Boolean)


1435
1436
1437
1438
1439
# File 'lib/roby/execution_engine.rb', line 1435

def has_waiting_work?
    # Filter out unscheduled promises (promises on which #execute was
    # not called). If they are unscheduled, we're not waiting on them
    waiting_work.any? { |w| !w.unscheduled? }
end

#in_propagation_context?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if we are within a propagation context (i.e. within event processing)

Returns:

  • (Boolean)


447
448
449
# File 'lib/roby/execution_engine.rb', line 447

def in_propagation_context?
    !!@propagation
end

#inhibited_exception?(exception) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Query whether the given exception is inhibited in this plan

Returns:

  • (Boolean)


1346
1347
1348
1349
# File 'lib/roby/execution_engine.rb', line 1346

def inhibited_exception?(exception)
    unhandled, _ = remove_inhibited_exceptions([exception.to_execution_exception])
    unhandled.empty?
end

#inside_control?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the current thread is the execution thread of this engine

See #outside_control? for a discussion of the use of #inside_control? and #outside_control? when testing the threading context

Returns:

  • (Boolean)


2096
2097
2098
2099
# File 'lib/roby/execution_engine.rb', line 2096

def inside_control?
    t = thread
    !t || t == Thread.current
end

#issue_quit_progression_warning(remaining) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
# File 'lib/roby/execution_engine.rb', line 2209

def issue_quit_progression_warning(remaining)
    info "Waiting for #{remaining.size} tasks to finish (#{plan.num_tasks} tasks still in plan) and #{waiting_work.size} async work jobs"
    remaining.each do |task|
        info "  #{task}"
    end
    quarantined = remaining.find_all { |t| t.quarantined? }
    if quarantined.size != 0
        info "#{quarantined.size} tasks in quarantine"
    end
end

#join_all_waiting_work(timeout: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Waits for all obligations in #waiting_work to finish



389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/roby/execution_engine.rb', line 389

def join_all_waiting_work(timeout: nil)
    return [], PropagationInfo.new if waiting_work.empty?
    deadline = if timeout
                   Time.now + timeout
               end

    finished = Array.new
    propagation_info = PropagationInfo.new
    begin
        framework_errors = gather_framework_errors("#join_all_waiting_work", raise_caught_exceptions: false) do
            next_steps = nil
            event_errors = gather_errors do
                next_steps = gather_propagation do
                    finished.concat(process_waiting_work)
                    blocks = Array.new
                    while !once_blocks.empty?
                        blocks << once_blocks.pop.last
                    end
                    call_poll_blocks(blocks)
                end
            end

            this_propagation = propagate_events_and_errors(next_steps, event_errors, garbage_collect_pass: false)
            propagation_info.merge(this_propagation)
        end
        propagation_info.add_framework_errors(framework_errors)

        Thread.pass
        has_scheduled_promises = has_waiting_work?
        if deadline && (Time.now > deadline) && has_scheduled_promises
            raise JoinAllWaitingWorkTimeout.new(waiting_work)
        end
    end while has_waiting_work?
    return finished, propagation_info
end

#killallObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Kill all tasks that are currently running in the plan



2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
# File 'lib/roby/execution_engine.rb', line 2488

def killall
    scheduler_enabled = scheduler.enabled?

    plan.permanent_tasks.clear
    plan.permanent_events.clear
    plan.mission_tasks.clear

    scheduler.enabled = false
    quit

    start_new_cycle
    process_events
    cycle_end(Hash.new)

    plan.transactions.each do |trsc|
        trsc.discard_transaction!
    end

    start_new_cycle
    Thread.pass
    process_events
    cycle_end(Hash.new)

ensure
    scheduler.enabled = scheduler_enabled
end

#next_event(pending) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

call-seq:

next_event(pending) => event, propagation_info

Determines the event in current_step which should be signalled now. Removes it from the set and returns the event and the associated propagation information.

See #gather_propagation for the format of the returned # propagation_info



954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
# File 'lib/roby/execution_engine.rb', line 954

def next_event(pending)
    # this variable is 2 if selected_event is being forwarded, 1 if it
    # is both forwarded and signalled and 0 if it is only signalled
    priority, step_id, selected_event = nil
    for propagation_step in pending
        target_event = propagation_step[0]
        target_step_id, forwards, signals = *propagation_step[1]
        target_priority = if forwards.empty? && signals.empty? then 2
                          elsif forwards.empty? then 0
                          else 1
                          end

        do_select = if selected_event
                        if precedence_graph.reachable?(selected_event, target_event)
                            false
                        elsif precedence_graph.reachable?(target_event, selected_event)
                            true
                        elsif priority < target_priority
                            true
                        elsif priority == target_priority
                            # If they are of the same priority, handle
                            # earlier events first
                            step_id > target_step_id
                        else
                            false
                        end
                    else
                        true
                    end

        if do_select
            selected_event = target_event
            priority       = target_priority
            step_id        = target_step_id
        end
    end
    [selected_event, *pending.delete(selected_event)]
end

#notify_about_error_handling_results(errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions



867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
# File 'lib/roby/execution_engine.rb', line 867

def notify_about_error_handling_results(errors)
    kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors =
        errors.kill_tasks, errors.fatal_errors, errors.nonfatal_errors, errors.free_events_errors, errors.handled_errors

    if !nonfatal_errors.empty?
        if display_exceptions?
            warn "#{nonfatal_errors.size} unhandled non-fatal exceptions"
        end
        nonfatal_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_NONFATAL, exception, tasks)
        end
    end

    if !handled_errors.empty?
        if display_exceptions?
            warn "#{handled_errors.size} handled errors"
        end
        handled_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_HANDLED, exception, tasks)
        end
    end

    if !free_events_errors.empty?
        if display_exceptions?
            warn "#{free_events_errors.size} free event exceptions"
        end
        free_events_errors.each do |exception, events|
            notify_exception(EXCEPTION_FREE_EVENT, exception, events)
        end
    end

    if !fatal_errors.empty?
        if display_exceptions?
            warn "#{fatal_errors.size} unhandled fatal exceptions, involving #{kill_tasks.size} tasks that will be forcefully killed"
        end
        fatal_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_FATAL, exception, tasks)
        end
        if display_exceptions?
            kill_tasks.each do |task|
                log_pp :warn, task
            end
        end
    end
end

#notify_exception(kind, error, involved_objects) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call to notify the listeners registered with #on_exception of the occurence of an exception



2592
2593
2594
2595
2596
2597
# File 'lib/roby/execution_engine.rb', line 2592

def notify_exception(kind, error, involved_objects)
        log(:exception_notification, plan.droby_id, kind, error, involved_objects)
    exception_listeners.each do |listener|
    listener.call(self, kind, error, involved_objects)
    end
end

#on_exception(description: 'exception listener', on_error: :disable) {|kind, error, tasks| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers a callback that will be called when exceptions are propagated in the plan

Yield Parameters:

Returns:



2539
2540
2541
2542
2543
# File 'lib/roby/execution_engine.rb', line 2539

def on_exception(description: 'exception listener', on_error: :disable, &block)
        handler = PollBlockDefinition.new(description, block, on_error: on_error)
    exception_listeners << handler
    handler
end

#once(sync: nil, description: 'once block', type: :external_events, **options, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules block to be called at the beginning of the next execution cycle, in propagation context.

Parameters:

  • sync (#fail) (defaults to: nil)

    a synchronization object that is used to communicate between the once block and the calling thread. The main use of this parameter is to make sure that #fail is called if the execution engine quits

  • type (:external_events, :propagation) (defaults to: :external_events)

    whether the block should be registered as an :external_events block, processed at the beginning of the cycle, or a :propagation block, processed at each propagation loop.

  • description (String) (defaults to: 'once block')

    a string describing the block. It will be used when adding timepoints to the event log



1359
1360
1361
1362
# File 'lib/roby/execution_engine.rb', line 1359

def once(sync: nil, description: 'once block', type: :external_events, **options, &block)
    waiting_work << sync if sync
    once_blocks << create_propagation_handler(description: description, type: type, once: true, **options, &block)
end

#outside_control?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the current thread is not the execution thread of this engine, or if there is not control thread. When you check the current thread context, always use a negated form. Do not do

if Roby.inside_control?
  ERROR
end

Do instead

if !Roby.outside_control?
  ERROR
end

Since the first form will fail if there is no control thread, while the second form will work. Use the first form only if you require that there actually IS a control thread.

Returns:

  • (Boolean)


2118
2119
2120
2121
# File 'lib/roby/execution_engine.rb', line 2118

def outside_control?
    t = thread
    !t || t != Thread.current
end

#prepare_propagation(target, is_forward, info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

call-seq:

prepare_propagation(target, is_forward, info) => source_events, source_generators, context
prepare_propagation(target, is_forward, info) => nil

Parses the propagation information info in the context of a signalling if is_forward is true and a forwarding otherwise. target is the target event.

The method adds the appropriate delayed events using #add_event_delay, and returns either nil if no propagation is to be performed, or the propagation source events, generators and context.

The format of info is the same as the hash values described in #gather_propagation.



1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
# File 'lib/roby/execution_engine.rb', line 1007

def prepare_propagation(target, is_forward, info)
    timeref = Time.now

    source_events, source_generators, context = Set.new, Set.new, []

    delayed = true
    info.each_slice(3) do |src, ctxt, time|
        if time && (delay = ExecutionEngine.make_delay(timeref, src, target, time))
            add_event_delay(delay, is_forward, src, target, ctxt)
            next
        end

        delayed = false

        # Merge identical signals. Needed because two different event handlers
        # can both call #emit, and two signals are set up
        if src
            if src.respond_to?(:generator)
                source_events << src 
                source_generators << src.generator
            else
                source_generators << src
            end
        end
        if ctxt
            context.concat ctxt
        end
    end

    unless delayed
        [source_events, source_generators, context]
    end
end

#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The inside part of the event loop

It gathers initial events and errors and propagate them

Returns:

Raises:

  • RecursivePropagationContext if called recursively



1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
# File 'lib/roby/execution_engine.rb', line 1647

def process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block)
    if @application_exceptions
        raise RecursivePropagationContext, "recursive call to process_events"
    end
    passed_recursive_check = true # to avoid having a almost-method-global ensure block
    @application_exceptions = []
    @emitted_events = Array.new

    @thread_pool.send :synchronize do
        @thread_pool.send(:ns_prune_pool)
    end

    # Gather new events and propagate them
    events_errors = nil
    next_steps = gather_propagation do
        events_errors = gather_errors do
            if caller_block
                yield 
                caller_block = nil
            end

            if !quitting? || !garbage_collect([])
                process_waiting_work
                log_timepoint 'workers'
                gather_external_events
                log_timepoint 'external_events'
                call_propagation_handlers
                log_timepoint 'propagation_handlers'
            end
        end
    end

    propagation_info = propagate_events_and_errors(next_steps, events_errors, garbage_collect_pass: garbage_collect_pass)
    if Roby.app.abort_on_exception? && !all_errors.fatal_errors.empty?
        raise Aborting.new(propagation_info.each_fatal_error.map(&:exception))
    end
    propagation_info.framework_errors.concat(@application_exceptions)
    propagation_info

ensure
    if passed_recursive_check
        process_pending_application_exceptions(raise_framework_errors: raise_framework_errors)
    end
end

#process_events_synchronous(seeds = Hash.new, initial_errors = Array.new, enable_scheduler: false, raise_errors: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling. This mode is implemented using this method

When errors occur in this mode, the exceptions are raised directly. This is useful in tests as, this way, we are sure that the exception will not get overlooked

If multiple errors are raised in a single call (this is possible due to Roby’s error handling mechanisms), the method will raise SynchronousEventProcessingMultipleErrors to wrap all the exceptions into one.



1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
# File 'lib/roby/execution_engine.rb', line 1704

def process_events_synchronous(seeds = Hash.new, initial_errors = Array.new, enable_scheduler: false, raise_errors: true)
    Roby.warn_deprecated "#process_events_synchronous is deprecated, use the expect_execution harness instead"

    if @application_exceptions
        raise RecursivePropagationContext, "recursive call to process_events"
    end
    passed_recursive_check = true # to avoid having a almost-method-global ensure block
    @application_exceptions = []

    # Save early for the benefit of the 'ensure' block
    current_scheduler_enabled = scheduler.enabled?

    if (!seeds.empty? || !initial_errors.empty?) && block_given?
        raise ArgumentError, "cannot provide both seeds/inital errors and a block"
    elsif block_given?
        seeds = gather_propagation do
            initial_errors = gather_errors do
                yield
            end
        end
    end

    scheduler.enabled = enable_scheduler

    propagation_info = propagate_events_and_errors(seeds, initial_errors, garbage_collect_pass: false)
    if !propagation_info.kill_tasks.empty?
        gc_initial_errors = nil
        gc_seeds = gather_propagation do
            gc_initial_errors = gather_errors do
                garbage_collect(propagation_info.kill_tasks)
            end
        end
        gc_errors = propagate_events_and_errors(gc_seeds, gc_initial_errors, garbage_collect_pass: false)
        propagation_info.merge(gc_errors)
    end

    if raise_errors
        propagation_info = propagation_info.exceptions
        if propagation_info.size == 1
            raise propagation_info.first
        elsif !propagation_info.empty?
            raise SynchronousEventProcessingMultipleErrors.new(propagation_info.map(&:exception))
        end
    else
        propagation_info
    end

rescue SynchronousEventProcessingMultipleErrors => e
    raise SynchronousEventProcessingMultipleErrors.new(e.errors + clear_application_exceptions)

rescue Exception => e
    if passed_recursive_check
        application_exceptions = clear_application_exceptions
        if !application_exceptions.empty?
            raise SynchronousEventProcessingMultipleErrors.new(application_exceptions.map(&:first) + [e])
        else raise e
        end
    else
        raise e
    end

ensure
    if passed_recursive_check && @application_exceptions
        process_pending_application_exceptions
    end
    scheduler.enabled = current_scheduler_enabled
end

#process_once_blocksObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Dispatch #once_blocks to the other handler sets for further processing



757
758
759
760
761
762
763
764
765
766
# File 'lib/roby/execution_engine.rb', line 757

def process_once_blocks
    while !once_blocks.empty?
        type, block = once_blocks.pop
        if type == :external_events
            external_events_handlers << block
        else
            propagation_handlers << block
        end
    end
end

#process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
# File 'lib/roby/execution_engine.rb', line 622

def process_pending_application_exceptions(application_errors = clear_application_exceptions,
    raise_framework_errors: Roby.app.abort_on_application_exception?)

    # We don't aggregate exceptions, so report them all and raise one
    if display_exceptions?
        application_errors.each do |error, source|
            if !error.kind_of?(Interrupt)
                fatal "Application error in #{source}"
                Roby.log_exception_with_backtrace(error, self, :fatal)
            end
        end
    end

    error, source = application_errors.find do |error, _|
        raise_framework_errors || error.kind_of?(SignalException)
    end
    if error
        raise error, "in #{source}: #{error.message}", error.backtrace
    end
end

#process_waiting_workObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler)



1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
# File 'lib/roby/execution_engine.rb', line 1444

def process_waiting_work
    finished, not_finished = waiting_work.partition do |work|
        work.complete?
    end

    finished.find_all do |work|
        work.rejected? && (work.respond_to?(:has_error_handler?) && !work.has_error_handler?)
    end.each do |work|
        e = work.reason
        e.set_backtrace(e.backtrace + caller)
        add_framework_error(e, work.to_s)
    end

    @waiting_work = not_finished
    finished
end

#promise(description: nil, executor: thread_pool, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a promise to execute the given block in a separate thread

Note that the returned value is a Promise. This means that callbacks added with #on_success or #rescue will be executed in the execution engine thread by default.



2604
2605
2606
# File 'lib/roby/execution_engine.rb', line 2604

def promise(description: nil, executor: thread_pool, &block)
    Promise.new(self, executor: executor, description: description, &block)
end

#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagate an initial set of event propagations and errors

Parameters:

  • next_steps (Array)

    the next propagations, as returned by #gather_propagation

  • initial_errors (Array)

    a set of errors that should be propagated

  • garbage_collect_pass (Boolean) (defaults to: true)

    whether the garbage collection pass should be performed or not. It is used in the tests’ codepath for Roby::EventGenerator#call and Roby::EventGenerator#emit.

Returns:

  • (PropagationInfo)

    what happened during the propagation and propagated



1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
# File 'lib/roby/execution_engine.rb', line 1784

def propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true)
    propagation_info = PropagationInfo.new
    events_errors = initial_errors.dup
    begin
        log_timepoint_group 'event_propagation_phase' do
            events_errors.concat(event_propagation_phase(next_steps, propagation_info))
        end

        next_steps = gather_propagation do
            exception_propagation_errors, error_phase_results = nil
            log_timepoint_group 'error_handling_phase' do
                exception_propagation_errors = gather_errors do
                    error_phase_results = error_handling_phase(events_errors)
                end
            end

            add_exceptions_for_inhibition(error_phase_results.each_fatal_error)
            propagation_info.merge(error_phase_results)
            garbage_collection_errors = gather_errors do
                plan.generate_induced_errors(error_phase_results)
                if garbage_collect_pass
                    garbage_collect(error_phase_results.kill_tasks)
                else []
                end
            end
            events_errors = (exception_propagation_errors + garbage_collection_errors)
            log_timepoint 'garbage_collect'
        end
    end while !next_steps.empty? || !events_errors.empty?
    propagation_info
end

#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The core exception propagation algorithm

Parameters:

  • exceptions (Array<(ExecutionException,Array<Task>)>)

    the set of exceptions to propagate, as well as the parents that towards which we should propagate them (if empty, all parents)

Yield Parameters:

  • exception (ExecutionException)

    the exception that is being propagated

  • handling_object (Task, Plan)

    the object we want to test whether it handles the exception or not

Yield Returns:

  • (Boolean)

    true if the exception is handled, false otherwise

Returns:

  • (Array<(ExecutionException,Array<Task>)>)

    the set of unhandled exceptions, as a mapping from an exception description to the set of tasks that are affected by it



1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
# File 'lib/roby/execution_engine.rb', line 1181

def propagate_exception_in_plan(exceptions)
    propagation_graph = dependency_graph.reverse

    # Propagate the exceptions in the hierarchy
    handled_unhandled = Array.new
    exceptions.each do |exception, parents|
        origin = exception.origin
        if parents
            filtered_parents = parents.find_all { |t| t.depends_on?(origin) }
            if filtered_parents != parents
                warn "some parents specified for #{exception.exception}(#{exception.exception.class}) are actually not parents of #{origin}, they got filtered out"
                (parents - filtered_parents).each do |task|
                    warn "  #{task}"
                end

                if filtered_parents.empty?
                    parents = propagation_graph.out_neighbours(origin)
                else
                    parents = filtered_parents
                end
            end
        else
            parents = propagation_graph.out_neighbours(origin)
        end

        debug do
            debug "propagating exception "
            log_pp :debug, exception
            if !parents.empty?
                debug "  constrained to parents"
                log_nest(2) do
                    parents.each do |p|
                        log_pp :debug, p
                    end
                end
            end
            break
        end

        visitor = ExceptionPropagationVisitor.new(propagation_graph, exception, origin, parents) do |e, task|
            yield(e, task)
        end
        visitor.visit

        unhandled = visitor.unhandled_exceptions.inject { |a, b| a.merge(b) }
        handled   = visitor.handled_exceptions.inject { |a, b| a.merge(b) }
        handled_unhandled << [handled, unhandled]
    end


    exceptions_handled_by = Array.new
    unhandled_exceptions  = Array.new
    handled_unhandled.each do |handled, e|
        if e
            if e.handled = yield(e, plan)
                if handled
                    handled_by = (handled.propagation_leafs.to_set << plan)
                    exceptions_handled_by << [handled.merge(e), handled_by]
                else
                    handled = e
                    exceptions_handled_by << [e, [plan].to_set]
                end
            else
                affected_tasks = e.trace.vertices.to_set
                if handled
                    affected_tasks -= handled.trace.vertices
                    exceptions_handled_by << [handled, handled.propagation_leafs.to_set]
                end
                unhandled_exceptions << [e, affected_tasks]
            end
        else
            exceptions_handled_by << [handled, handled.propagation_leafs.to_set]
        end
    end

    debug do
        debug "#{unhandled_exceptions.size} unhandled exceptions remain"
        log_nest(2) do
            unhandled_exceptions.each do |e, affected_tasks|
                log_pp :debug, e
                debug "Affects #{affected_tasks.size} tasks"
                log_nest(2) do
                    affected_tasks.each do |t|
                        log_pp :debug, t
                    end
                end
            end
        end
        break
    end
    return unhandled_exceptions, exceptions_handled_by
end

#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions

Parameters:

  • exceptions (Array<(ExecutionException,Array<Task>)>)

    the set of exceptions to propagate, as well as the parents that towards which we should propagate them (if empty, all parents)

Returns:

  • (Array<(ExecutionException,Array<Task>)>)

    the set of unhandled exceptions, as a mapping from an exception description to the set of tasks that are affected by it



1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
# File 'lib/roby/execution_engine.rb', line 1281

def propagate_exceptions(exceptions)
    if exceptions.empty?
        return Array.new, Array.new, Array.new
    end

    # Remove all exception that are not associated with a task
    exceptions, free_events_exceptions = exceptions.partition do |e, _|
        e.origin
    end
    # Normalize the free events exceptions
    free_events_exceptions = free_events_exceptions.map do |e, _|
        if e.exception.failed_generator.plan
            [e, Set[e.exception.failed_generator]]
        end
    end.compact

    debug "Filtering inhibited exceptions"
    exceptions = log_nest(2) do
        non_inhibited, _ = remove_inhibited_exceptions(exceptions)
        # Reset the trace for the real propagation
        non_inhibited.map do |e, _|
            _, propagate_through = exceptions.find { |original_e, _| original_e.exception == e.exception }
            e.reset_trace
            [e, propagate_through]
        end
    end

    debug "Propagating #{exceptions.size} non-inhibited exceptions"
    log_nest(2) do
        # Note that the first half of the method filtered the free
        # events exceptions out of 'exceptions'
        unhandled, handled = propagate_exception_in_plan(exceptions) do |e, object|
            object.handle_exception(e)
        end

        return unhandled, free_events_exceptions, handled
    end
end

#propagation_context(sources) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets the source_event and source_generator variables according to source. source is the from argument of #add_event_propagation



664
665
666
667
668
669
670
671
672
# File 'lib/roby/execution_engine.rb', line 664

def propagation_context(sources)
    current_sources = @propagation_sources
    raise InternalError, "not in a gathering context in #propagation_context" unless in_propagation_context?

    @propagation_sources = sources
    yield
ensure
    @propagation_sources = current_sources
end

#propagation_source_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of events extracted from #sources



457
458
459
460
461
462
463
464
465
# File 'lib/roby/execution_engine.rb', line 457

def propagation_source_events
    result = Set.new
    for ev in @propagation_sources
        if ev.respond_to?(:generator)
            result << ev
        end
    end
    result
end

#propagation_source_generatorsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of generators extracted from #sources



468
469
470
471
472
473
474
475
476
477
478
# File 'lib/roby/execution_engine.rb', line 468

def propagation_source_generators
    result = Set.new
    for ev in @propagation_sources
        result << if ev.respond_to?(:generator)
                      ev.generator
                  else
                      ev
                  end
    end
    result
end

#queue_forward(sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Queue a forwarding to be propagated



684
685
686
# File 'lib/roby/execution_engine.rb', line 684

def queue_forward(sources, target, context, timespec)
    add_event_propagation(true, sources, target, context, timespec)
end

#queue_signal(sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Queue a signal to be propagated



679
680
681
# File 'lib/roby/execution_engine.rb', line 679

def queue_signal(sources, target, context, timespec)
    add_event_propagation(false, sources, target, context, timespec)
end

#quitObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Make control quit properly



2386
# File 'lib/roby/execution_engine.rb', line 2386

def quit; @quit = 1 end

#quitting?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the control thread is currently quitting

Returns:

  • (Boolean)


2382
# File 'lib/roby/execution_engine.rb', line 2382

def quitting?; @quit > 0 end

#refresh_relationsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Refresh the value of cached relations

Some often-used relations are cached at #initialize, such as #dependency_graph and #precedence_graph. Call this when the actual graph objects have changed on the plan



134
135
136
137
138
139
# File 'lib/roby/execution_engine.rb', line 134

def refresh_relations
    @dependency_graph = plan.task_relation_graph_for(TaskStructure::Dependency)
    @precedence_graph = plan.event_relation_graph_for(EventStructure::Precedence)
    @signal_graph     = plan.event_relation_graph_for(EventStructure::Signal)
    @forward_graph    = plan.event_relation_graph_for(EventStructure::Forwarding)
end

#remove_at_cycle_end(handler_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Removes a handler added by #at_cycle_end

Parameters:



2047
2048
2049
# File 'lib/roby/execution_engine.rb', line 2047

def remove_at_cycle_end(handler_id)
    at_cycle_end_handlers.delete_if { |h| h.object_id == handler_id }
end

#remove_exception_listener(handler) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Removes an exception listener registered with #on_exception

Parameters:



2586
2587
2588
# File 'lib/roby/execution_engine.rb', line 2586

def remove_exception_listener(handler)
    exception_listeners.delete(handler)
end

#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process the given exceptions to remove the ones that are currently filtered by the plan repairs

The returned exceptions are propagated, i.e. their #trace method contains all the tasks that are affected by the absence of a handling mechanism

Parameters:

  • exceptions ((ExecutionException,Array<Roby::Task>))

    pairs of exceptions as well as the “root tasks”, i.e. the parents of origin.task towards which they should be propagated

Returns:



1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
# File 'lib/roby/execution_engine.rb', line 1331

def remove_inhibited_exceptions(exceptions)
    exceptions = exceptions.find_all do |execution_exception, _|
        execution_exception.origin.plan
    end

    propagate_exception_in_plan(exceptions) do |e, object|
        if has_pending_exception_matching?(e, object)
            true
        elsif object.respond_to?(:handles_error?)
            object.handles_error?(e)
        end
    end
end

#remove_periodic_handler(id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Removes a periodic handler defined by #every. id is the value returned by #every.



2072
2073
2074
2075
2076
# File 'lib/roby/execution_engine.rb', line 2072

def remove_periodic_handler(id)
    execute do
        process_every.delete_if { |spec| spec[0].id == id }
    end
end

#remove_propagation_handler(id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



362
363
364
365
366
# File 'lib/roby/execution_engine.rb', line 362

def remove_propagation_handler(id)
    disabled_handlers.delete_if { |p| p.id == id }
    super
    nil
end

#resetObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Make a quit EE ready for reuse



2391
2392
2393
# File 'lib/roby/execution_engine.rb', line 2391

def reset
    @quit = 0
end

#reset_thread_poolObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2480
2481
2482
2483
2484
2485
# File 'lib/roby/execution_engine.rb', line 2480

def reset_thread_pool
    if @thread_pool
        @thread_pool.shutdown
    end
    @thread_pool = Concurrent::CachedThreadPool.new(idletime: 10)
end

#run(cycle: 0.1) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Main event loop. Valid options are

cycle

the cycle duration in seconds (default: 0.1)



2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
# File 'lib/roby/execution_engine.rb', line 2127

def run(cycle: 0.1)
    if running?
        raise AlreadyRunning, "#run has already been called"
    end
    self.running = true

    @allow_propagation = false
    @waiting_work = Concurrent::Array.new

    @thread = Thread.current
    @thread.name = "MAIN"

    @cycle_length = cycle
    event_loop

ensure
    self.running = false
    @thread = nil
    waiting_work.delete_if do |w|
        next(true) if w.complete?

        # rubocop:disable Lint/HandleExceptions
        begin
            w.fail ExecutionQuitError
            Roby.warn "forcefully terminated #{w} on quit"
        rescue Concurrent::MultipleAssignmentError
            # Race condition: something completed the promise while
            # we were trying to make it fail
        end
        # rubocop:enable Lint/HandleExceptions

        true
    end
    finalizers.each do |blk|
        begin
            blk.call
        rescue Exception => e
            Roby.warn "finalizer #{blk} failed"
            Roby.log_exception_with_backtrace(e, Roby, :warn)
        end
    end
    @quit = 0
    @allow_propagation = true
end

#shutdownObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2475
2476
2477
2478
# File 'lib/roby/execution_engine.rb', line 2475

def shutdown
    killall
    thread_pool.shutdown
end

#start_new_cycle(time = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set the cycle_start attribute and increment cycle_index

This is only used for testing purposes



2372
2373
2374
2375
# File 'lib/roby/execution_engine.rb', line 2372

def start_new_cycle(time = Time.now)
    @cycle_start = time
    @cycle_index += 1
end

#unmark_finished_missions_and_permanent_tasksObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
# File 'lib/roby/execution_engine.rb', line 1836

def unmark_finished_missions_and_permanent_tasks
    to_unmark = plan.task_index.by_predicate[:finished?] | plan.task_index.by_predicate[:failed?]

    finished_missions = (plan.mission_tasks & to_unmark)
    # Remove all missions that are finished
    for finished_mission in finished_missions
        if !finished_mission.being_repaired?
            plan.unmark_mission_task(finished_mission)
        end
    end
    finished_permanent = (plan.permanent_tasks & to_unmark)
    for finished_permanent in (plan.permanent_tasks & to_unmark)
        if !finished_permanent.being_repaired?
            plan.unmark_permanent_task(finished_permanent)
        end
    end
end

#unreachable_event(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by EventGenerator when an event became unreachable



514
515
516
# File 'lib/roby/execution_engine.rb', line 514

def unreachable_event(event)
    delayed_events.delete_if { |_, _, _, signalled, _| signalled == event }
end

#wait_one_cycleObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Blocks until at least once execution cycle has been done



1991
1992
1993
1994
1995
1996
1997
# File 'lib/roby/execution_engine.rb', line 1991

def wait_one_cycle
    current_cycle = execute { cycle_index }
    while current_cycle == execute { cycle_index }
    raise ExecutionQuitError if !running?
    sleep(cycle_length)
    end
end

#wait_until(ev) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the current thread until the given even is emitted. If the event becomes unreachable, an UnreachableEvent exception is raised.



2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
# File 'lib/roby/execution_engine.rb', line 2448

def wait_until(ev)
    if inside_control?
        raise ThreadMismatch, "cannot use #wait_until in execution threads"
    end

    ivar = Concurrent::IVar.new
    result = nil
    once(sync: ivar) do
        if ev.unreachable?
            ivar.fail(UnreachableEvent.new(ev, ev.unreachability_reason))
        else
            ev.if_unreachable(cancel_at_emission: true) do |reason, event|
                ivar.fail(UnreachableEvent.new(event, reason)) if !ivar.complete?
            end
            ev.once do |ev|
                ivar.set(result) if !ivar.complete?
            end
            begin
                result = yield if block_given?
            rescue Exception => e
                ivar.fail(e)
            end
        end
    end
    ivar.value!
end