Class: Roby::ExecutionEngine Private
- Extended by:
- Logger::Hierarchy, PropagationHandlerMethods
- Includes:
- Logger::Hierarchy, DRoby::EventLogging, PropagationHandlerMethods
- Defined in:
- lib/roby/execution_engine.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
The core execution algorithm
It is in charge of handling event and exception propagation, as well as running cleanup processes (e.g. garbage collection).
The main method is #process_events. When executing a Roby application, it is called periodically by #event_loop.
Defined Under Namespace
Modules: PropagationHandlerMethods Classes: AlreadyRunning, EventLoopExitState, ExceptionPropagationVisitor, JoinAllWaitingWorkTimeout, NotPropagationContext, PollBlockDefinition, PropagationInfo, RecursivePropagationContext
Constant Summary collapse
- PENDING_PROPAGATION_FORWARD =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
1- PENDING_PROPAGATION_SIGNAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
2- SLEEP_MIN_TIME =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Do not sleep or call Thread#pass if there is less that this much time left in the cycle
0.01
- INTERRUPT_FORCE_EXIT_DEAD_ZONE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
How many seconds between two Interrupt before the execution engine’s loop can forcefully quit
10- EXCEPTION_NONFATAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for non-fatal, unhandled exceptions
:nonfatal- EXCEPTION_FATAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for fatal, unhandled exceptions
:fatal- EXCEPTION_HANDLED =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for handled exceptions
:handled- EXCEPTION_FREE_EVENT =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for free event exceptions
:free_event
Instance Attribute Summary collapse
-
#additional_errors ⇒ Object
readonly
private
Used during exception propagation to inject new errors in the process.
-
#application_exceptions ⇒ Object
readonly
private
The set of errors which have been generated outside of the plan’s control.
-
#at_cycle_end_handlers ⇒ Object
readonly
private
A set of blocks that are called at each cycle end.
-
#control ⇒ Object
private
The DecisionControl object associated with this engine.
-
#cycle_index ⇒ Object
readonly
private
The number of this cycle since the beginning.
-
#cycle_length ⇒ Object
readonly
private
The cycle length in seconds.
-
#cycle_start ⇒ Object
readonly
private
The starting Time of this cycle.
-
#delayed_events ⇒ Object
readonly
private
The set of pending delayed events.
-
#dependency_graph ⇒ Object
readonly
private
Cached graph object for TaskStructure::Dependency.
-
#emitted_events ⇒ Array<Event>
readonly
private
The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop).
-
#event_logger ⇒ Object
private
The underlying DRoby::EventLogger.
-
#event_ordering ⇒ Object
readonly
private
The topological ordering of events w.r.t.
-
#event_priorities ⇒ Object
readonly
private
The event => index hash which give the propagation priority for each event.
-
#exception_listeners ⇒ Array<#call>
readonly
private
The blocks that are currently listening to exceptions.
-
#finalizers ⇒ Object
readonly
private
A set of proc objects which are to be called when the execution engine quits.
-
#forward_graph ⇒ Object
readonly
private
Cached graph object for EventStructure::Forward.
-
#last_stop_count ⇒ Object
readonly
private
:nodoc:.
-
#once_blocks ⇒ Queue
readonly
private
Thread-safe queue to push work to the execution engine.
-
#plan ⇒ Object
private
The Plan this engine is acting on.
-
#precedence_graph ⇒ Object
readonly
private
Cached graph object for Roby::EventStructure::Precedence.
-
#process_every ⇒ Object
readonly
private
A set of blocks which are called every cycle.
-
#propagation_id ⇒ Object
readonly
private
A numeric ID giving the count of the current propagation cycle.
-
#propagation_sources ⇒ Object
readonly
private
The set of source events for the current propagation action.
-
#scheduler ⇒ Object
private
The scheduler is the object which handles non-generic parts of the propagation cycle.
-
#signal_graph ⇒ Object
readonly
private
Cached graph object for EventStructure::Signal.
-
#thread ⇒ Object
private
The execution thread if there is one running.
-
#thread_pool ⇒ Concurrent::CachedThreadPool
readonly
private
A thread pool on which async work should be executed.
-
#waiting_work ⇒ Array<#fail,#complete?>
readonly
private
A list of threaded objects waiting for the control thread.
Attributes included from PropagationHandlerMethods
#external_events_handlers, #propagation_handlers, #side_work_handlers
Class Method Summary collapse
-
.call_every(plan) ⇒ Object
private
Calls the periodic blocks which should be called.
-
.make_delay(timeref, source, target, timespec) ⇒ Object
private
Returns a Time object which represents the absolute point in time referenced by
timespecin the context of delaying a propagation betweensourceandtarget. -
.validate_timespec(timespec) ⇒ Object
private
Validates
timespecas a delay specification.
Instance Method Summary collapse
-
#add_error(e, propagate_through: nil) ⇒ Object
private
Register a LocalizedError for future propagation.
-
#add_event_delay(time, is_forward, source, target, context) ⇒ Object
private
Adds a propagation step to be performed when the current time is greater than
time. -
#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object
private
Adds a propagation to the next propagation step: it registers a propagation step to be performed between
sourceandtargetwith the givencontext. -
#add_exceptions_for_inhibition(fatal_errors) ⇒ Object
private
Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles.
-
#add_framework_error(error, source) ⇒ Object
private
Registers the given error and a description of its source in the list of application/framework errors.
-
#at_cycle_end(description: "at_cycle_end", **options) {|plan| ... } ⇒ Object
private
Adds a block to be called at the end of each execution cycle.
-
#call_poll_blocks(blocks, late = false) ⇒ Object
private
Helper that calls the propagation handlers in
propagation_handlers(which are expected to be instances of PollBlockDefinition) and handles the errors according of each handler’s policy. - #call_propagation_handlers ⇒ Object private
-
#clear ⇒ Object
private
Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.
- #clear_application_exceptions ⇒ Object private
-
#compute_errors(events_errors) ⇒ PropagationInfo
private
Compute the set of fatal errors in the current execution state.
-
#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object
private
Compute the set of unhandled fatal exceptions.
-
#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
private
Called at each cycle end.
-
#delayed(delay, description: "delayed block", **options, &block) ⇒ Object
private
Schedules
blockto be called once afterdelayseconds passed, in the propagation context. -
#display_exceptions=(flag) ⇒ Object
private
Controls whether this engine should indiscriminately display all fatal exceptions.
-
#display_exceptions? ⇒ Boolean
private
whether this engine should indiscriminately display all fatal exceptions.
-
#error_handling_phase(events_errors) ⇒ Object
private
Compute errors in plan and handle the results.
-
#event_loop ⇒ Object
private
The main event loop.
-
#event_loop_handle_interrupt(exit_state) ⇒ Object
private
Handle a received Interrupt for #event_loop.
-
#event_loop_teardown(exit_state) ⇒ Boolean
private
Handle the teardown logic for #event_loop.
-
#event_propagation_phase(initial_events, propagation_info) ⇒ Object
private
Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block.
-
#event_propagation_step(current_step, propagation_info) ⇒ Object
private
Propagate one step.
-
#every(duration, description: "periodic handler", **options, &block) ⇒ #dispose
private
Call
blockeverydurationseconds. -
#execute(catch: [], type: :external_events, &block) ⇒ Object
private
Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context.
-
#execute_delayed_events ⇒ Object
private
Adds the events in
delayed_eventswhose time has passed into the propagation. - #execute_one_cycle(time = Time.now) ⇒ Object private
-
#execute_side_work ⇒ Object
private
Execute the work registered with PropagationHandlerMethods#add_side_work_handler.
-
#finalized_event(event) ⇒ Object
private
Called by #plan when an event has been finalized.
-
#finalized_task(task) ⇒ Object
private
Called by #plan when a task has been finalized.
-
#force_quit ⇒ Object
private
Force quitting, without cleaning up.
-
#forced_exit? ⇒ Boolean
private
True if the control thread is currently quitting.
-
#garbage_collect(force_on = nil) ⇒ Boolean
private
Kills and removes unneeded tasks for which there are no dependencies.
-
#garbage_collect_stop_task(local_task) ⇒ Boolean
private
Handle a single root task in the #garbage_collect process.
-
#gather_errors ⇒ Array<ExecutionException>
private
Executes the given block while gathering errors, and returns the errors that have been declared with #add_error.
-
#gather_external_events ⇒ Object
private
Gather the events that come out of this plan manager.
-
#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object
private
Yields to the block and registers any raised exception using #add_framework_error.
-
#gather_propagation(initial_set = {}, &block) ⇒ Object
private
Sets up a propagation context, yielding the block in it.
- #gathering? ⇒ Boolean private
- #gathering_errors? ⇒ Boolean private
-
#has_pending_exception_matching?(e, object) ⇒ Boolean
private
Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object.
-
#has_pending_forward?(from, to, expected_context) ⇒ Boolean
private
Whether a forward matching this signature is currently pending.
-
#has_pending_signal?(from, to, expected_context) ⇒ Boolean
private
Whether a signal matching this signature is currently pending.
- #has_propagation_for?(target) ⇒ Boolean private
-
#has_queued_events? ⇒ Boolean
private
Returns true if some events are queued.
-
#has_waiting_work? ⇒ Boolean
private
Whether this EE has asynchronous waiting work waiting to be processed.
-
#in_propagation_context? ⇒ Boolean
private
True if we are within a propagation context (i.e. within event processing).
-
#inhibited_exception?(exception) ⇒ Boolean
private
Query whether the given exception is inhibited in this plan.
-
#initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) ⇒ ExecutionEngine
constructor
private
Create an execution engine acting on
plan, usingcontrolas the decision control object. -
#inside_control? ⇒ Boolean
private
True if the current thread is the execution thread of this engine.
- #interrupt ⇒ Object private
- #issue_quit_progression_warning(remaining) ⇒ Object private
-
#join_all_waiting_work(timeout: nil) ⇒ Object
private
Waits for all obligations in #waiting_work to finish.
-
#killall ⇒ Object
private
Kill all tasks that are currently running in the plan.
-
#next_event(pending) ⇒ Object
private
call-seq: next_event(pending) => event, propagation_info.
-
#notify_about_error_handling_results(errors) ⇒ Object
private
Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions.
-
#notify_exception(kind, error, involved_objects) ⇒ Object
private
Call to notify the listeners registered with #on_exception of the occurence of an exception.
-
#on_exception(description: "exception listener", on_error: :disable) {|kind, error, tasks| ... } ⇒ Object
private
Registers a callback that will be called when exceptions are propagated in the plan.
-
#once(sync: nil, description: "once block", type: :external_events, **options, &block) ⇒ Object
private
Schedules
blockto be called at the beginning of the next execution cycle, in propagation context. -
#outside_control? ⇒ Boolean
private
True if the current thread is not the execution thread of this engine, or if there is not control thread.
-
#prepare_propagation(target, is_forward, info) ⇒ Object
private
call-seq: prepare_propagation(target, is_forward, info) => source_events, source_generators, context prepare_propagation(target, is_forward, info) => nil.
-
#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo
private
The inside part of the event loop.
-
#process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block) ⇒ Object
private
Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling.
-
#process_once_blocks ⇒ Object
private
Dispatch #once_blocks to the other handler sets for further processing.
- #process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object private
-
#process_waiting_work ⇒ Object
private
Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler).
-
#promise(description: nil, executor: thread_pool, &block) ⇒ Object
private
Create a promise to execute the given block in a separate thread.
-
#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo
private
Propagate an initial set of event propagations and errors.
-
#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>
private
The core exception propagation algorithm.
-
#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>
private
Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions.
-
#propagation_context(sources) ⇒ Object
private
Sets the source_event and source_generator variables according to
source. -
#propagation_source_events ⇒ Object
private
The set of events extracted from #sources.
-
#propagation_source_generators ⇒ Object
private
The set of generators extracted from #sources.
-
#queue_forward(sources, target, context, timespec) ⇒ Object
private
Queue a forwarding to be propagated.
-
#queue_signal(sources, target, context, timespec) ⇒ Object
private
Queue a signal to be propagated.
-
#quit ⇒ Object
private
Make control quit properly.
-
#quitting? ⇒ Boolean
private
True if the control thread is currently quitting.
-
#refresh_relations ⇒ Object
private
Refresh the value of cached relations.
-
#remove_at_cycle_end(handler_id) ⇒ Object
private
Removes a handler added by #at_cycle_end.
-
#remove_exception_listener(handler) ⇒ void
private
Removes an exception listener registered with #on_exception.
-
#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>
private
Process the given exceptions to remove the ones that are currently filtered by the plan repairs.
-
#remove_periodic_handler(id) ⇒ Object
private
Removes a periodic handler defined by #every.
-
#reset ⇒ Object
private
Make a quit EE ready for reuse.
- #reset_thread_pool ⇒ Object private
-
#run(cycle: 0.1) ⇒ Object
private
Main event loop.
- #shutdown ⇒ Object private
-
#start_new_cycle(time = Time.now) ⇒ Object
private
Set the cycle_start attribute and increment cycle_index.
- #unmark_finished_missions_and_permanent_tasks ⇒ Object private
-
#unreachable_event(event) ⇒ Object
private
Called by EventGenerator when an event became unreachable.
-
#wait_one_cycle ⇒ Object
private
Blocks until at least once execution cycle has been done.
-
#wait_until(ev) ⇒ Object
private
Stops the current thread until the given even is emitted.
Methods included from PropagationHandlerMethods
add_propagation_handler, add_side_work_handler, at_cycle_begin, create_propagation_handler, each_cycle, remove_propagation_handler, remove_side_work_handler
Methods included from DRoby::EventLogging
#log, #log_flush_cycle, #log_queue_size, #log_timepoint, #log_timepoint_group, #log_timepoint_group_end, #log_timepoint_group_start
Constructor Details
#initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) ⇒ ExecutionEngine
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create an execution engine acting on plan, using control as the decision control object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/roby/execution_engine.rb', line 79 def initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) @plan = plan @event_logger = event_logger @use_oob_gc = ExecutionEngine.use_oob_gc? @control = control @scheduler = Schedulers::Null.new(plan) reset_thread_pool @thread = Thread.current @propagation = nil @propagation_id = 0 @propagation_exceptions = nil @application_exceptions = nil @delayed_events = [] @event_ordering = [] @event_priorities = {} @propagation_handlers = [] @external_events_handlers = [] @side_work_handlers = [] @at_cycle_end_handlers = [] @process_every = [] @waiting_work = Concurrent::Array.new @emitted_events = [] @exception_listeners = [] @worker_threads_mtx = Mutex.new @worker_threads = [] @once_blocks = Queue.new @pending_exceptions = {} each_cycle(&ExecutionEngine.method(:call_every)) @quit = 0 @allow_propagation = true @cycle_index = 0 @cycle_start = Time.now @cycle_length = 0.1 @last_stop_count = 0 @finalizers = [] @gc_warning = true refresh_relations @exception_display_handler = Roby.null_disposable self.display_exceptions = true end |
Instance Attribute Details
#additional_errors ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Used during exception propagation to inject new errors in the process
It shall not be accessed directly. Instead, Plan#add_error should be called
1471 1472 1473 |
# File 'lib/roby/execution_engine.rb', line 1471 def additional_errors @additional_errors end |
#application_exceptions ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of errors which have been generated outside of the plan’s control. For now, those errors cause the whole controller to shut down.
1456 1457 1458 |
# File 'lib/roby/execution_engine.rb', line 1456 def application_exceptions @application_exceptions end |
#at_cycle_end_handlers ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of blocks that are called at each cycle end
2143 2144 2145 |
# File 'lib/roby/execution_engine.rb', line 2143 def at_cycle_end_handlers @at_cycle_end_handlers end |
#control ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The DecisionControl object associated with this engine
180 181 182 |
# File 'lib/roby/execution_engine.rb', line 180 def control @control end |
#cycle_index ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The number of this cycle since the beginning
2202 2203 2204 |
# File 'lib/roby/execution_engine.rb', line 2202 def cycle_index @cycle_index end |
#cycle_length ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The cycle length in seconds
2196 2197 2198 |
# File 'lib/roby/execution_engine.rb', line 2196 def cycle_length @cycle_length end |
#cycle_start ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The starting Time of this cycle
2199 2200 2201 |
# File 'lib/roby/execution_engine.rb', line 2199 def cycle_start @cycle_start end |
#delayed_events ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of pending delayed events. This is an array of the form
[[time, is_forward, source, target, context], ...]
See #add_event_delay for more information
545 546 547 |
# File 'lib/roby/execution_engine.rb', line 545 def delayed_events @delayed_events end |
#dependency_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for TaskStructure::Dependency
This is here for performance reasons, to avoid resolving the same graph over and over
169 170 171 |
# File 'lib/roby/execution_engine.rb', line 169 def dependency_graph @dependency_graph end |
#emitted_events ⇒ Array<Event> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop)
187 188 189 |
# File 'lib/roby/execution_engine.rb', line 187 def emitted_events @emitted_events end |
#event_logger ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The underlying DRoby::EventLogger
It is usually the same than the #plan‘s. Pass a DRoby::NullEventLogger at construction time to disable logging of execution events.
178 179 180 |
# File 'lib/roby/execution_engine.rb', line 178 def event_logger @event_logger end |
#event_ordering ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The topological ordering of events w.r.t. the Precedence relation. This gets updated on-demand when the event relations change.
1004 1005 1006 |
# File 'lib/roby/execution_engine.rb', line 1004 def event_ordering @event_ordering end |
#event_priorities ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The event => index hash which give the propagation priority for each event
1007 1008 1009 |
# File 'lib/roby/execution_engine.rb', line 1007 def event_priorities @event_priorities end |
#exception_listeners ⇒ Array<#call> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The blocks that are currently listening to exceptions
190 191 192 |
# File 'lib/roby/execution_engine.rb', line 190 def exception_listeners @exception_listeners end |
#finalizers ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of proc objects which are to be called when the execution engine quits.
2517 2518 2519 |
# File 'lib/roby/execution_engine.rb', line 2517 def finalizers @finalizers end |
#forward_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for EventStructure::Forward
This is here for performance reasons, to avoid resolving the same graph over and over
163 164 165 |
# File 'lib/roby/execution_engine.rb', line 163 def forward_graph @forward_graph end |
#last_stop_count ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
:nodoc:
2288 2289 2290 |
# File 'lib/roby/execution_engine.rb', line 2288 def last_stop_count @last_stop_count end |
#once_blocks ⇒ Queue (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Thread-safe queue to push work to the execution engine
Do not access directly, use #once instead
198 199 200 |
# File 'lib/roby/execution_engine.rb', line 198 def once_blocks @once_blocks end |
#plan ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The Plan this engine is acting on
172 173 174 |
# File 'lib/roby/execution_engine.rb', line 172 def plan @plan end |
#precedence_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for Roby::EventStructure::Precedence
This is here for performance reasons, to avoid resolving the same graph over and over
151 152 153 |
# File 'lib/roby/execution_engine.rb', line 151 def precedence_graph @precedence_graph end |
#process_every ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of blocks which are called every cycle
2165 2166 2167 |
# File 'lib/roby/execution_engine.rb', line 2165 def process_every @process_every end |
#propagation_id ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A numeric ID giving the count of the current propagation cycle
182 183 184 |
# File 'lib/roby/execution_engine.rb', line 182 def propagation_id @propagation_id end |
#propagation_sources ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of source events for the current propagation action. This is a mix of EventGenerator and Event objects.
514 515 516 |
# File 'lib/roby/execution_engine.rb', line 514 def propagation_sources @propagation_sources end |
#scheduler ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The scheduler is the object which handles non-generic parts of the propagation cycle. For now, its #initial_events method is called at the beginning of each propagation cycle and can call or emit a set of events.
See Schedulers::Basic
485 486 487 |
# File 'lib/roby/execution_engine.rb', line 485 def scheduler @scheduler end |
#signal_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for EventStructure::Signal
This is here for performance reasons, to avoid resolving the same graph over and over
157 158 159 |
# File 'lib/roby/execution_engine.rb', line 157 def signal_graph @signal_graph end |
#thread ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The execution thread if there is one running
2190 2191 2192 |
# File 'lib/roby/execution_engine.rb', line 2190 def thread @thread end |
#thread_pool ⇒ Concurrent::CachedThreadPool (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A thread pool on which async work should be executed
145 146 147 |
# File 'lib/roby/execution_engine.rb', line 145 def thread_pool @thread_pool end |
#waiting_work ⇒ Array<#fail,#complete?> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A list of threaded objects waiting for the control thread
Objects registered here will be notified them by calling #fail on them when it quits. In addition, #join_all_waiting_work will wait for all pending jobs to finish.
Note that all Concurrent::Obligation subclasses fit the bill
2140 2141 2142 |
# File 'lib/roby/execution_engine.rb', line 2140 def waiting_work @waiting_work end |
Class Method Details
.call_every(plan) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Calls the periodic blocks which should be called
2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 |
# File 'lib/roby/execution_engine.rb', line 2108 def self.call_every(plan) # :nodoc: engine = plan.execution_engine now = engine.cycle_start length = engine.cycle_length engine.process_every.map! do |handler, last_call, duration| next if handler.disposed? # Check if the nearest timepoint is the beginning of # this cycle or of the next cycle if !last_call || (duration - (now - last_call)) < length / 2 unless handler.call(engine, engine.plan) next end next if handler.once? next if handler.disposed? last_call = now end [handler, last_call, duration] end.compact! end |
.make_delay(timeref, source, target, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a Time object which represents the absolute point in time referenced by timespec in the context of delaying a propagation between source and target.
See validate_timespec for more information
994 995 996 997 998 999 1000 |
# File 'lib/roby/execution_engine.rb', line 994 def self.make_delay(timeref, source, target, timespec) if delay = timespec[:delay] then timeref + delay elsif at = timespec[:at] then at else raise ArgumentError, "invalid timespec #{timespec}" end end |
.validate_timespec(timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Validates timespec as a delay specification. A valid delay specification is either nil or a hash, in which case two forms are possible:
at: absolute_time
delay: number
983 984 985 986 987 |
# File 'lib/roby/execution_engine.rb', line 983 def self.validate_timespec(timespec) if timespec timespec = timespec, i[delay at] end end |
Instance Method Details
#add_error(e, propagate_through: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a LocalizedError for future propagation
This method must be called in a error-gathering context (i.e. #gather_errors).
638 639 640 641 642 643 644 645 646 647 648 |
# File 'lib/roby/execution_engine.rb', line 638 def add_error(e, propagate_through: nil) plan_exception = e.to_execution_exception if @propagation_exceptions @propagation_exceptions << [plan_exception, propagate_through] else Roby.log_exception_with_backtrace(e, self, :fatal) raise NotPropagationContext, "#add_error called outside an error-gathering "\ "context (#add_error)" end end |
#add_event_delay(time, is_forward, source, target, context) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a propagation step to be performed when the current time is greater than time. The propagation step is a signal if is_forward is false and a forward otherwise.
This method should not be called directly. Use #add_event_propagation with the appropriate timespec argument.
See also #delayed_events and #execute_delayed_events
555 556 557 |
# File 'lib/roby/execution_engine.rb', line 555 def add_event_delay(time, is_forward, source, target, context) delayed_events << [time, is_forward, source, target, context] end |
#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a propagation to the next propagation step: it registers a propagation step to be performed between source and target with the given context. If is_forward is true, the propagation will be a forwarding, otherwise it is a signal.
If timespec is not nil, it defines a delay to be applied before calling the target event.
See #gather_propagation
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 |
# File 'lib/roby/execution_engine.rb', line 764 def add_event_propagation(is_forward, sources, target, context, timespec) if target.plan != plan raise Roby::EventNotExecutable.new(target), "#{target} not in executed plan" end target.pending(sources.find_all { |ev| ev.kind_of?(Event) }) @propagation_step_id += 1 target_info = (@propagation[target] ||= [@propagation_step_id, [], []]) step = target_info[is_forward ? PENDING_PROPAGATION_FORWARD : PENDING_PROPAGATION_SIGNAL] if sources.empty? step << nil << context << timespec else sources.each do |ev| step << ev << context << timespec end end end |
#add_exceptions_for_inhibition(fatal_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles
1938 1939 1940 1941 1942 1943 1944 1945 |
# File 'lib/roby/execution_engine.rb', line 1938 def add_exceptions_for_inhibition(fatal_errors) fatal_errors.each do |exception, involved_tasks| involved_tasks.each do |t| (@pending_exceptions[t] ||= Set.new) << [exception.exception.class, exception.origin] end end end |
#add_framework_error(error, source) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers the given error and a description of its source in the list of application/framework errors
It must be called within an exception-gathering context, that is either within #process_events, or within #gather_framework_errors
These errors will terminate the event loop
717 718 719 720 721 722 723 724 |
# File 'lib/roby/execution_engine.rb', line 717 def add_framework_error(error, source) if @application_exceptions @application_exceptions << [error, source] else Roby.log_exception_with_backtrace(error, self, :fatal) raise NotPropagationContext, "#add_framework_error called outside an exception-gathering context" end end |
#at_cycle_end(description: "at_cycle_end", **options) {|plan| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a block to be called at the end of each execution cycle
2151 2152 2153 2154 2155 |
# File 'lib/roby/execution_engine.rb', line 2151 def at_cycle_end(description: "at_cycle_end", **, &block) handler = PollBlockDefinition.new(description, block, **) at_cycle_end_handlers << handler handler end |
#call_poll_blocks(blocks, late = false) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Helper that calls the propagation handlers in propagation_handlers (which are expected to be instances of PollBlockDefinition) and handles the errors according of each handler’s policy
804 805 806 807 808 809 810 811 812 813 814 815 816 817 |
# File 'lib/roby/execution_engine.rb', line 804 def call_poll_blocks(blocks, late = false) blocks.delete_if do |handler| if handler.disabled? || (handler.late? ^ late) next(handler.disposed?) end log_timepoint_group handler.description do unless handler.call(self, plan) handler.disabled = true end end handler.once? || handler.disposed? end end |
#call_propagation_handlers ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 |
# File 'lib/roby/execution_engine.rb', line 840 def call_propagation_handlers process_once_blocks if scheduler.enabled? gather_framework_errors("scheduler") do scheduler.initial_events log_timepoint "scheduler" end end call_poll_blocks(self.class.propagation_handlers, false) call_poll_blocks(self.propagation_handlers, false) unless has_queued_events? call_poll_blocks(self.class.propagation_handlers, true) call_poll_blocks(self.propagation_handlers, true) end end |
#clear ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.
Returns nil if the plan is cleared, and the set of remaining tasks otherwise. Note that quaranteened tasks are not counted as remaining, as it is not possible for the execution engine to stop them.
2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 |
# File 'lib/roby/execution_engine.rb', line 2296 def clear plan.mission_tasks.dup.each { |t| plan.unmark_mission_task(t) } plan.permanent_tasks.dup.each { |t| plan.unmark_permanent_task(t) } plan.permanent_events.dup.each { |t| plan.unmark_permanent_event(t) } plan.force_gc.merge(plan.tasks) quaranteened_subplan = plan.compute_useful_tasks(plan.quarantined_tasks) remaining = plan.tasks - quaranteened_subplan @pending_exceptions.clear if remaining.empty? # Have to call #garbage_collect one more to make # sure that unneeded events are removed as well garbage_collect # Done cleaning the tasks, clear the remains plan.transactions.each do |trsc| trsc.discard_transaction if trsc.self_owned? end plan.clear emitted_events.clear return end remaining end |
#clear_application_exceptions ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
1458 1459 1460 1461 1462 1463 1464 1465 |
# File 'lib/roby/execution_engine.rb', line 1458 def clear_application_exceptions unless @application_exceptions raise RecursivePropagationContext, "unbalanced call to #clear_application_exceptions" end result, @application_exceptions = @application_exceptions, nil result end |
#compute_errors(events_errors) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute the set of fatal errors in the current execution state
1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 |
# File 'lib/roby/execution_engine.rb', line 1478 def compute_errors(events_errors) # Generate exceptions from task structure structure_errors = plan.check_structure log_timepoint "structure_check" # Propagate the errors. Note that the plan repairs are taken into # account in ExecutionEngine.propagate_exceptions directly. We keep # event and structure errors separate since in the first case there # is not two-stage handling (all errors that have not been handled # are fatal), and in the second case we call #check_structure # again to errors that are remaining after the call to the exception # handlers events_errors, free_events_errors, events_handled = propagate_exceptions(events_errors) _, structure_handled = propagate_exceptions(structure_errors) log_timepoint "exception_propagation" # Get the remaining problems in the plan structure, and act on it structure_errors, structure_inhibited = remove_inhibited_exceptions(plan.check_structure) # Partition them by fatal/nonfatal fatal_errors, nonfatal_errors = [], [] (structure_errors + events_errors).each do |e, involved_tasks| if e.fatal? fatal_errors << [e, involved_tasks] else nonfatal_errors << [e, involved_tasks] end end kill_tasks = compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors).to_set handled_errors = structure_handled + events_handled debug "#{fatal_errors.size} fatal errors found and "\ "#{free_events_errors.size} errors involving free events" debug "the fatal errors involve #{kill_tasks.size} non-finalized tasks" PropagationInfo.new( Set.new, Set.new, kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors, structure_inhibited ) end |
#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute the set of unhandled fatal exceptions
919 920 921 922 923 924 925 926 |
# File 'lib/roby/execution_engine.rb', line 919 def compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) kill_tasks = fatal_errors.inject(Set.new) do |tasks, (_exception, affected_tasks)| tasks.merge(affected_tasks) end # Tasks might have been finalized during exception handling, filter # those out kill_tasks.find_all(&:plan) end |
#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called at each cycle end
2549 2550 2551 2552 2553 |
# File 'lib/roby/execution_engine.rb', line 2549 def cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) gather_framework_errors("#cycle_end", raise_caught_exceptions: raise_framework_errors) do call_poll_blocks(at_cycle_end_handlers) end end |
#delayed(delay, description: "delayed block", **options, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules block to be called once after delay seconds passed, in the propagation context
1445 1446 1447 1448 1449 1450 1451 |
# File 'lib/roby/execution_engine.rb', line 1445 def delayed(delay, description: "delayed block", **, &block) handler = PollBlockDefinition.new(description, block, once: true, **) once do process_every << [handler, cycle_start, delay] end handler end |
#display_exceptions=(flag) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Controls whether this engine should indiscriminately display all fatal exceptions
This is on by default
2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 |
# File 'lib/roby/execution_engine.rb', line 2706 def display_exceptions=(flag) unless flag @exception_display_handler.dispose return end return unless @exception_display_handler.disposed? @exception_display_handler = on_exception do |kind, error, tasks| level = if kind == EXCEPTION_HANDLED then :debug else :warn end send(level) do send(level, "encountered a #{kind} exception") Roby.log_exception_with_backtrace(error.exception, self, level) if kind == EXCEPTION_HANDLED send(level, "the exception was handled by") else send(level, "the exception involved") end tasks.each do |t| send(level, " #{t}") end break end end end |
#display_exceptions? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
whether this engine should indiscriminately display all fatal exceptions
2737 2738 2739 |
# File 'lib/roby/execution_engine.rb', line 2737 def display_exceptions? !@exception_display_handler.disposed? end |
#error_handling_phase(events_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute errors in plan and handle the results
904 905 906 907 908 909 910 911 912 913 914 915 916 |
# File 'lib/roby/execution_engine.rb', line 904 def error_handling_phase(events_errors) # Do the exception handling phase errors = compute_errors(events_errors) notify_about_error_handling_results(errors) # nonfatal errors are only notified. Fatal errors (kill_tasks) are # handled in the propagation loop during garbage collection. Only # the free events errors have to be handled here. errors.free_events_errors.each do |exception, generators| generators.each { |g| g.unreachable!(exception.exception) } end errors end |
#event_loop ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The main event loop. It returns when the execution engine is asked to quit. In general, this does not need to be called direclty: use #run to start the event loop in a separate thread.
2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 |
# File 'lib/roby/execution_engine.rb', line 2351 def event_loop @cycle_start = Time.now @cycle_index = 0 exit_state = EventLoopExitState.new(0, Time.now, nil) @interrupted = false loop do GC::Profiler.enable if profile_gc? if @interrupted @interrupted = false event_loop_handle_interrupt(exit_state) end if quitting? return if forced_exit? return if event_loop_teardown(exit_state) end log_timepoint_group "cycle" do execute_one_cycle end GC::Profiler.disable if profile_gc? rescue Exception => e if quitting? fatal "Execution thread FORCEFULLY quitting "\ "because of unhandled exception" Roby.log_exception_with_backtrace(e, self, :fatal) raise else quit fatal "Execution thread quitting because of unhandled exception" Roby.log_exception_with_backtrace(e, self, :fatal) end end ensure unless plan.tasks.empty? warn "the following tasks are still present in the plan:" plan.tasks.each do |t| warn " #{t}" end end end |
#event_loop_handle_interrupt(exit_state) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle a received Interrupt for #event_loop
2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 |
# File 'lib/roby/execution_engine.rb', line 2426 def event_loop_handle_interrupt(exit_state) if quitting? exit_state.force_exit_deadline ||= Time.now + INTERRUPT_FORCE_EXIT_DEADLINE time_until_deadline = exit_state.force_exit_deadline - Time.now if time_until_deadline < 0 fatal "Quitting without cleaning up" force_quit else fatal "Still #{time_until_deadline.ceil}s before "\ "interruption will quit without cleaning up" end else fatal "Received interruption request" fatal "Interrupt again in #{INTERRUPT_FORCE_EXIT_DEAD_ZONE}s "\ "to quit without cleaning up" quit exit_state.force_exit_deadline = Time.now + INTERRUPT_FORCE_EXIT_DEAD_ZONE end end |
#event_loop_teardown(exit_state) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle the teardown logic for #event_loop
2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 |
# File 'lib/roby/execution_engine.rb', line 2402 def event_loop_teardown(exit_state) return true unless (remaining = clear) display_warning = (exit_state.last_stop_count != remaining.size) || (Time.now - exit_state.last_quit_warning) > 10 return unless display_warning if display_warning Robot.info "Roby quitting ..." if exit_state.last_stop_count == 0 issue_quit_progression_warning(remaining) exit_state.last_quit_warning = Time.now exit_state.last_stop_count = remaining.size end false rescue Exception => e Robot.warn "Execution thread failed to clean up" Roby.log_exception_with_backtrace(e, Robot, :warn, filter: false) true end |
#event_propagation_phase(initial_events, propagation_info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block
If a block is given, it is called with the initial set of events: the events we should consider as already emitted in the following propagation. seeds si a list of procs which should be called to initiate the propagation (i.e. build an initial set of events)
889 890 891 892 893 894 895 896 897 898 899 900 901 |
# File 'lib/roby/execution_engine.rb', line 889 def event_propagation_phase(initial_events, propagation_info) @propagation_id += 1 gather_errors do next_steps = initial_events until next_steps.empty? until next_steps.empty? next_steps = event_propagation_step(next_steps, propagation_info) end next_steps = gather_propagation { call_propagation_handlers } end end end |
#event_propagation_step(current_step, propagation_info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagate one step
current_step describes all pending emissions and calls.
This method calls ExecutionEngine.next_event to get the description of the next event to call. If there are signals going to this event, they are processed and the forwardings will be treated in the next step.
The method returns the next set of pending emissions and calls, adding the forwardings and signals that the propagation of the considered event have added.
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 |
# File 'lib/roby/execution_engine.rb', line 1116 def event_propagation_step(current_step, propagation_info) signalled, _step_id, forward_info, call_info = next_event(current_step) next_step = nil if !call_info.empty? source_events, source_generators, context = prepare_propagation(signalled, false, call_info) if source_events log(:generator_propagate_events, false, source_events, signalled) if signalled.self_owned? next_step = gather_propagation(current_step) do propagation_context(source_events | source_generators) do begin propagation_info.add_generator_call(signalled) signalled.call_without_propagation(context) rescue Roby::LocalizedError => e if signalled.command_emitted? add_error(e) else signalled.emit_failed(e) end rescue Exception => e if signalled.command_emitted? add_error(Roby::CommandFailed.new(e, signalled)) else signalled.emit_failed(Roby::CommandFailed.new(e, signalled)) end end end end end end if forward_info next_step ||= {} target_info = (next_step[signalled] ||= [@propagation_step_id += 1, [], []]) target_info[PENDING_PROPAGATION_FORWARD].concat(forward_info) end elsif !forward_info.empty? source_events, source_generators, context = prepare_propagation(signalled, true, forward_info) if source_events log(:generator_propagate_events, true, source_events, signalled) # If the destination event is not owned, but if the peer is not # connected, the event is our responsibility now. if signalled.self_owned? || signalled.owners.none? { |peer| peer != plan.local_owner && peer.connected? } next_step = gather_propagation(current_step) do propagation_context(source_events | source_generators) do begin if event = signalled.emit_without_propagation(context) propagation_info.add_event_emission(event) emitted_events << event end rescue Roby::LocalizedError => e Roby.warn( "Internal Error: #emit_without_propagation "\ "emitted a LocalizedError exception. This is "\ "unsupported and will become a fatal error in "\ "the future. You should usually replace raise "\ "with engine.add_error" ) Roby.display_exception( Roby.logger.io(:warn), e, false ) add_error(e) rescue Exception => e Roby.warn( "Internal Error: #emit_without_propagation "\ "emitted an exception. This is unsupported "\ "and will become a fatal error in the future. "\ "You should create a proper localized error "\ "and replace raise with engine.add_error" ) Roby.display_exception( Roby.logger.io(:warn), e, false ) add_error(Roby::EmissionFailed.new(e, signalled)) end end end end end end current_step.merge!(next_step) if next_step current_step end |
#every(duration, description: "periodic handler", **options, &block) ⇒ #dispose
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call block every duration seconds. Note that duration is round up to the cycle size (time between calls is *at least* duration)
2172 2173 2174 2175 2176 2177 2178 2179 2180 |
# File 'lib/roby/execution_engine.rb', line 2172 def every(duration, description: "periodic handler", **, &block) handler = PollBlockDefinition.new(description, block, **) once do if handler.call(self, plan) process_every << [handler, cycle_start, duration] end end handler end |
#execute(catch: [], type: :external_events, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context. If the block raises, the exception is raised back in the calling thread.
2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 |
# File 'lib/roby/execution_engine.rb', line 2558 def execute(catch: [], type: :external_events, &block) raise ArgumentError, "a block is required" unless block_given? if inside_control? return yield end capture_catch = lambda do |symbol, *other| caught = catch(symbol) do if other.empty? return [:ret, yield] else return capture_catch(block, *other) end end [:throw, [symbol, caught]] end ivar = Concurrent::IVar.new once(sync: ivar, type: type) do begin if !catch.empty? result = capture_catch.call(*catch, &block) ivar.set(result) else ivar.set([:ret, yield]) end rescue ::Exception => e # rubocop:disable Lint/RescueException ivar.set([:raise, e]) end end mode, value = ivar.value! case mode when :ret value when :throw throw(*value) else raise value end end |
#execute_delayed_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds the events in delayed_events whose time has passed into the propagation. This must be called in propagation context.
See #add_event_delay and #delayed_events
563 564 565 566 567 568 569 570 571 |
# File 'lib/roby/execution_engine.rb', line 563 def execute_delayed_events reftime = Time.now delayed_events.delete_if do |time, forward, source, signalled, context| if time <= reftime add_event_propagation(forward, [source], signalled, context, nil) true end end end |
#execute_one_cycle(time = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 |
# File 'lib/roby/execution_engine.rb', line 2448 def execute_one_cycle(time = Time.now) last_process_times = Process.times last_dump_time = plan.event_logger.dump_time while time > cycle_start + cycle_length @cycle_start += cycle_length @cycle_index += 1 end stats = {} stats[:start] = [cycle_start.tv_sec, cycle_start.tv_usec] stats[:actual_start] = time - cycle_start stats[:cycle_index] = cycle_index log_timepoint_group "process_events" do process_events end execute_side_work log_timepoint "side-work" if use_oob_gc? stats[:pre_oob_gc] = GC.stat GC::OOB.run end # Sleep if there is enough time for it remaining_cycle_time = cycle_length - (Time.now - cycle_start) if remaining_cycle_time > SLEEP_MIN_TIME sleep(remaining_cycle_time) end log_timepoint "sleep" # Log cycle statistics process_times = Process.times dump_time = plan.event_logger.dump_time stats[:log_queue_size] = plan.log_queue_size stats[:plan_task_count] = plan.num_tasks stats[:plan_event_count] = plan.num_free_events stats[:gc] = GC.stat stats[:utime] = process_times.utime - last_process_times.utime stats[:stime] = process_times.stime - last_process_times.stime stats[:dump_time] = dump_time - last_dump_time stats[:state] = Roby::State stats[:end] = Time.now - cycle_start if profile_gc? stats[:gc_profile_data] = GC::Profiler.raw_data stats[:gc_total_time] = GC::Profiler.total_time else stats[:gc_profile_data] = nil stats[:gc_total_time] = 0 end cycle_end(stats) log_flush_cycle :cycle_end, stats @cycle_start += cycle_length @cycle_index += 1 end |
#execute_side_work ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Execute the work registered with Roby::ExecutionEngine::PropagationHandlerMethods#add_side_work_handler
429 430 431 432 |
# File 'lib/roby/execution_engine.rb', line 429 def execute_side_work call_poll_blocks(self.side_work_handlers, false) call_poll_blocks(self.class.side_work_handlers, false) end |
#finalized_event(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by #plan when an event has been finalized
584 585 586 587 588 |
# File 'lib/roby/execution_engine.rb', line 584 def finalized_event(event) @propagation&.delete(event) event.unreachable!("finalized", plan) # since the event is already finalized, end |
#finalized_task(task) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by #plan when a task has been finalized
579 580 581 |
# File 'lib/roby/execution_engine.rb', line 579 def finalized_task(task) @pending_exceptions.delete(task) end |
#force_quit ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Force quitting, without cleaning up
2539 2540 2541 |
# File 'lib/roby/execution_engine.rb', line 2539 def force_quit @quit = 2 end |
#forced_exit? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the control thread is currently quitting
2525 2526 2527 |
# File 'lib/roby/execution_engine.rb', line 2525 def forced_exit? @quit > 1 end |
#garbage_collect(force_on = nil) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Kills and removes unneeded tasks for which there are no dependencies
1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 |
# File 'lib/roby/execution_engine.rb', line 1975 def garbage_collect(force_on = nil) if force_on && !force_on.empty? info "GC: adding #{force_on.size} tasks in the force_gc set" valid_plan, mismatching_plan = force_on.partition do |t| t.plan == self.plan end plan.force_gc.merge(valid_plan) unless mismatching_plan.empty? mismatches_s = mismatching_plan.map { |t| "#{t}(plan=#{t.plan})" } .join(", ") raise ArgumentError, "#{mismatches_s} have been given to #{self}.garbage_collect, "\ "but they are not tasks in #{plan}" end end unmark_finished_missions_and_permanent_tasks # The set of tasks for which we will queue stop! at this cycle # #finishing? is false until the next event propagation cycle finishing = Set.new # Loop until no tasks have been removed loop do tasks = plan.unneeded_tasks | plan.force_gc local_tasks = plan.local_tasks & tasks remote_tasks = tasks - local_tasks # Remote tasks are simply removed, regardless of other concerns for t in remote_tasks debug { "GC: removing the remote task #{t}" } plan.garbage_task(t) end break if local_tasks.empty? debug do debug "#{local_tasks.size} tasks are unneeded in this plan" local_tasks.each do |t| debug " #{t} mission=#{plan.mission_task?(t)} "\ "permanent=#{plan.permanent_task?(t)}" end break end # Find the roots, that is the tasks we should be trying to # stop. They are the tasks which have no running parents. roots = local_tasks.dup - finishing plan.default_useful_task_graphs.each do |g| roots.delete_if do |t| g.each_in_neighbour(t).any? { |p| !p.finished? } end break if roots.empty? end new_finishing_tasks = roots.find_all { |local_task| garbage_collect_stop_task(local_task) } finishing.merge(new_finishing_tasks) break unless roots.any?(&:finalized?) end finishing.each(&:stop!) plan.unneeded_events.each do |event| plan.garbage_event(event) end !finishing.empty? end |
#garbage_collect_stop_task(local_task) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle a single root task in the #garbage_collect process
2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 |
# File 'lib/roby/execution_engine.rb', line 2054 def garbage_collect_stop_task(local_task) if local_task.pending? info "GC: removing pending task #{local_task}" plan.garbage_task(local_task) elsif local_task.failed_to_start? info "GC: removing task that failed to start #{local_task}" plan.garbage_task(local_task) elsif local_task.starting? # wait for task to be started before killing it debug "GC: #{local_task} is starting" elsif local_task.finished? debug "GC: #{local_task} is not running, removed" plan.garbage_task(local_task) elsif local_task.finishing? debug do debug "GC: waiting for #{local_task} to finish" local_task.history.each do |ev| debug "GC: #{ev}" end break end elsif local_task.quarantined? warn "GC: #{local_task} is running but in quarantine" elsif local_task.event(:stop).controlable? debug { "GC: attempting to stop #{local_task}" } if !local_task.respond_to?(:stop!) warn "something fishy: #{local_task}/stop is controlable but "\ "there is no #stop! method, putting in quarantine" plan.quarantine_task(local_task) else return true end else warn "GC: #{local_task} cannot be stopped, putting in quarantine" plan.quarantine_task(local_task) end false end |
#gather_errors ⇒ Array<ExecutionException>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Executes the given block while gathering errors, and returns the errors that have been declared with #add_error
865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 |
# File 'lib/roby/execution_engine.rb', line 865 def gather_errors if @propagation_exceptions raise InternalError, "recursive call to #gather_errors" end # The ensure clause must NOT apply to the recursive check above. # Otherwise, we end up resetting @propagation_exceptions to nil, # which wreaks havoc begin @propagation_exceptions = [] yield @propagation_exceptions ensure @propagation_exceptions = nil end end |
#gather_external_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Gather the events that come out of this plan manager
833 834 835 836 837 838 |
# File 'lib/roby/execution_engine.rb', line 833 def gather_external_events process_once_blocks gather_framework_errors("delayed events") { execute_delayed_events } call_poll_blocks(self.class.external_events_handlers) call_poll_blocks(self.external_events_handlers) end |
#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Yields to the block and registers any raised exception using #add_framework_error
If the method is called within an exception-gathering context (either #process_events or #gather_framework_errors itself), nothing else is done. Otherwise, #process_pending_application_exceptions is called to re-raise any caught exception
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 |
# File 'lib/roby/execution_engine.rb', line 657 def gather_framework_errors(source, raise_caught_exceptions: true) if @application_exceptions recursive_error_gathering_context = true else @application_exceptions = [] end yield if !recursive_error_gathering_context && !raise_caught_exceptions clear_application_exceptions end rescue Exception => e add_framework_error(e, source) if !recursive_error_gathering_context && !raise_caught_exceptions clear_application_exceptions end ensure if !recursive_error_gathering_context && raise_caught_exceptions process_pending_application_exceptions end end |
#gather_propagation(initial_set = {}, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets up a propagation context, yielding the block in it. During this propagation stage, all calls to #emit and #call are stored in an internal hash of the form:
target => [forward_sources, signal_sources]
where the two _sources are arrays of the form
[[source, context], ...]
The method returns the resulting hash. Use #in_propagation_context? to know if the current engine is in a propagation context, and #add_event_propagation to add a new entry to this set.
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 |
# File 'lib/roby/execution_engine.rb', line 606 def gather_propagation(initial_set = {}, &block) if in_propagation_context? raise InternalError, "nested call to #gather_propagation" end old_allow_propagation, @allow_propagation = @allow_propagation, true # The ensure clause must NOT apply to the recursive check above. # Otherwise, we end up resetting @propagation_exceptions to nil, # which wreaks havoc begin @propagation = initial_set @propagation_sources = nil @propagation_step_id = 0 propagation_context([], &block) result, @propagation = @propagation, nil result ensure @propagation = nil @allow_propagation = old_allow_propagation end end |
#gathering? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
498 499 500 501 502 |
# File 'lib/roby/execution_engine.rb', line 498 def gathering? Roby.warn_deprecated "#gathering? is deprecated, use "\ "#in_propagation_context? instead" in_propagation_context? end |
#gathering_errors? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
857 858 859 |
# File 'lib/roby/execution_engine.rb', line 857 def gathering_errors? !!@propagation_exceptions end |
#has_pending_exception_matching?(e, object) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object
1932 1933 1934 |
# File 'lib/roby/execution_engine.rb', line 1932 def has_pending_exception_matching?(e, object) @pending_exceptions[object]&.include?([e.exception.class, e.origin]) end |
#has_pending_forward?(from, to, expected_context) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether a forward matching this signature is currently pending
784 785 786 787 788 789 790 |
# File 'lib/roby/execution_engine.rb', line 784 def has_pending_forward?(from, to, expected_context) if pending = @propagation[to] pending[PENDING_PROPAGATION_FORWARD].each_slice(3).any? do |event, context, timespec| (from === event.generator) && (expected_context === context) end end end |
#has_pending_signal?(from, to, expected_context) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether a signal matching this signature is currently pending
793 794 795 796 797 798 799 |
# File 'lib/roby/execution_engine.rb', line 793 def has_pending_signal?(from, to, expected_context) if pending = @propagation[to] pending[PENDING_PROPAGATION_SIGNAL].each_slice(3).any? do |event, context, timespec| (from === event.generator) && (expected_context === context) end end end |
#has_propagation_for?(target) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
738 739 740 |
# File 'lib/roby/execution_engine.rb', line 738 def has_propagation_for?(target) @propagation&.has_key?(target) end |
#has_queued_events? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if some events are queued
591 592 593 |
# File 'lib/roby/execution_engine.rb', line 591 def has_queued_events? !@propagation.empty? end |
#has_waiting_work? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether this EE has asynchronous waiting work waiting to be processed
1522 1523 1524 1525 1526 |
# File 'lib/roby/execution_engine.rb', line 1522 def has_waiting_work? # Filter out unscheduled promises (promises on which #execute was # not called). If they are unscheduled, we're not waiting on them waiting_work.any? { |w| !w.unscheduled? } end |
#in_propagation_context? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if we are within a propagation context (i.e. within event processing)
506 507 508 |
# File 'lib/roby/execution_engine.rb', line 506 def in_propagation_context? !!@propagation end |
#inhibited_exception?(exception) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Query whether the given exception is inhibited in this plan
1425 1426 1427 1428 |
# File 'lib/roby/execution_engine.rb', line 1425 def inhibited_exception?(exception) unhandled, = remove_inhibited_exceptions([exception.to_execution_exception]) unhandled.empty? end |
#inside_control? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the current thread is the execution thread of this engine
See #outside_control? for a discussion of the use of #inside_control? and #outside_control? when testing the threading context
2208 2209 2210 2211 |
# File 'lib/roby/execution_engine.rb', line 2208 def inside_control? t = thread !t || t == Thread.current end |
#interrupt ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2529 2530 2531 |
# File 'lib/roby/execution_engine.rb', line 2529 def interrupt @interrupted = true end |
#issue_quit_progression_warning(remaining) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 |
# File 'lib/roby/execution_engine.rb', line 2325 def issue_quit_progression_warning(remaining) Robot.info( "Waiting for #{remaining.size} tasks to finish "\ "(#{plan.num_tasks} tasks still in plan) and "\ "#{waiting_work.size} async work jobs" ) remaining.each do |task| Robot.info " #{task}" end quarantined = remaining.find_all(&:quarantined?) unless quarantined.empty? Robot.info "#{quarantined.size} tasks in quarantine" end end |
#join_all_waiting_work(timeout: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Waits for all obligations in #waiting_work to finish
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 |
# File 'lib/roby/execution_engine.rb', line 435 def join_all_waiting_work(timeout: nil) return [], PropagationInfo.new if waiting_work.empty? deadline = if timeout Time.now + timeout end finished = [] propagation_info = PropagationInfo.new loop do framework_errors = gather_framework_errors( "#join_all_waiting_work", raise_caught_exceptions: false ) do next_steps = nil event_errors = gather_errors do next_steps = gather_propagation do finished.concat(process_waiting_work) blocks = [] until once_blocks.empty? blocks << once_blocks.pop.last end call_poll_blocks(blocks) end end this_propagation = propagate_events_and_errors( next_steps, event_errors, garbage_collect_pass: false ) propagation_info.merge(this_propagation) end propagation_info.add_framework_errors(framework_errors) Thread.pass has_scheduled_promises = has_waiting_work? if deadline && (Time.now > deadline) && has_scheduled_promises raise JoinAllWaitingWorkTimeout.new(waiting_work) end break unless has_waiting_work? end [finished, propagation_info] end |
#killall ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Kill all tasks that are currently running in the plan
2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 |
# File 'lib/roby/execution_engine.rb', line 2640 def killall scheduler_enabled = scheduler.enabled? plan.permanent_tasks.clear plan.permanent_events.clear plan.mission_tasks.clear scheduler.enabled = false quit start_new_cycle process_events cycle_end({}) plan.transactions.each(&:discard_transaction!) start_new_cycle Thread.pass process_events cycle_end({}) ensure scheduler.enabled = scheduler_enabled end |
#next_event(pending) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
call-seq:
next_event(pending) => event, propagation_info
Determines the event in current_step which should be signalled now. Removes it from the set and returns the event and the associated propagation information.
See #gather_propagation for the format of the returned # propagation_info
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 |
# File 'lib/roby/execution_engine.rb', line 1017 def next_event(pending) # this variable is 2 if selected_event is being forwarded, 1 if it # is both forwarded and signalled and 0 if it is only signalled priority, step_id, selected_event = nil for propagation_step in pending target_event = propagation_step[0] target_step_id, forwards, signals = *propagation_step[1] target_priority = if forwards.empty? && signals.empty? then 2 elsif forwards.empty? then 0 else 1 end do_select = if selected_event if precedence_graph.reachable?(selected_event, target_event) false elsif precedence_graph.reachable?(target_event, selected_event) true elsif priority < target_priority true elsif priority == target_priority # If they are of the same priority, handle # earlier events first step_id > target_step_id else false end else true end if do_select selected_event = target_event priority = target_priority step_id = target_step_id end end [selected_event, *pending.delete(selected_event)] end |
#notify_about_error_handling_results(errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions
930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 |
# File 'lib/roby/execution_engine.rb', line 930 def notify_about_error_handling_results(errors) kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors = errors.kill_tasks, errors.fatal_errors, errors.nonfatal_errors, errors.free_events_errors, errors.handled_errors unless nonfatal_errors.empty? if display_exceptions? warn "#{nonfatal_errors.size} unhandled non-fatal exceptions" end nonfatal_errors.each do |exception, tasks| notify_exception(EXCEPTION_NONFATAL, exception, tasks) end end unless handled_errors.empty? if display_exceptions? warn "#{handled_errors.size} handled errors" end handled_errors.each do |exception, tasks| notify_exception(EXCEPTION_HANDLED, exception, tasks) end end unless free_events_errors.empty? if display_exceptions? warn "#{free_events_errors.size} free event exceptions" end free_events_errors.each do |exception, events| notify_exception(EXCEPTION_FREE_EVENT, exception, events) end end unless fatal_errors.empty? if display_exceptions? warn "#{fatal_errors.size} unhandled fatal exceptions, involving #{kill_tasks.size} tasks that will be forcefully killed" end fatal_errors.each do |exception, tasks| notify_exception(EXCEPTION_FATAL, exception, tasks) end if display_exceptions? kill_tasks.each do |task| log_pp :warn, task end end end end |
#notify_exception(kind, error, involved_objects) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call to notify the listeners registered with #on_exception of the occurence of an exception
2743 2744 2745 2746 2747 2748 |
# File 'lib/roby/execution_engine.rb', line 2743 def notify_exception(kind, error, involved_objects) log(:exception_notification, plan.droby_id, kind, error, involved_objects) exception_listeners.each do |listener| listener.call(self, kind, error, involved_objects) end end |
#on_exception(description: "exception listener", on_error: :disable) {|kind, error, tasks| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers a callback that will be called when exceptions are propagated in the plan
2688 2689 2690 2691 2692 |
# File 'lib/roby/execution_engine.rb', line 2688 def on_exception(description: "exception listener", on_error: :disable, &block) handler = PollBlockDefinition.new(description, block, on_error: on_error) exception_listeners << handler Roby.disposable { exception_listeners.delete(handler) } end |
#once(sync: nil, description: "once block", type: :external_events, **options, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules block to be called at the beginning of the next execution cycle, in propagation context.
1438 1439 1440 1441 |
# File 'lib/roby/execution_engine.rb', line 1438 def once(sync: nil, description: "once block", type: :external_events, **, &block) waiting_work << sync if sync once_blocks << create_propagation_handler(description: description, type: type, once: true, **, &block) end |
#outside_control? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the current thread is not the execution thread of this engine, or if there is not control thread. When you check the current thread context, always use a negated form. Do not do
if Roby.inside_control?
ERROR
end
Do instead
if !Roby.outside_control?
ERROR
end
Since the first form will fail if there is no control thread, while the second form will work. Use the first form only if you require that there actually IS a control thread.
2230 2231 2232 2233 |
# File 'lib/roby/execution_engine.rb', line 2230 def outside_control? t = thread !t || t != Thread.current end |
#prepare_propagation(target, is_forward, info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
call-seq:
prepare_propagation(target, is_forward, info) => source_events, source_generators, context
prepare_propagation(target, is_forward, info) => nil
Parses the propagation information info in the context of a signalling if is_forward is true and a forwarding otherwise. target is the target event.
The method adds the appropriate delayed events using #add_event_delay, and returns either nil if no propagation is to be performed, or the propagation source events, generators and context.
The format of info is the same as the hash values described in #gather_propagation.
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 |
# File 'lib/roby/execution_engine.rb', line 1071 def prepare_propagation(target, is_forward, info) timeref = Time.now source_events, source_generators, context = Set.new, Set.new, [] delayed = true info.each_slice(3) do |src, ctxt, time| if time && (delay = ExecutionEngine.make_delay(timeref, src, target, time)) add_event_delay(delay, is_forward, src, target, ctxt) next end delayed = false # Merge identical signals. Needed because two different event handlers # can both call #emit, and two signals are set up if src if src.respond_to?(:generator) source_events << src source_generators << src.generator else source_generators << src end end if ctxt context.concat ctxt end end unless delayed [source_events, source_generators, context] end end |
#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The inside part of the event loop
It gathers initial events and errors and propagate them
1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 |
# File 'lib/roby/execution_engine.rb', line 1745 def process_events( raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block ) if @application_exceptions raise RecursivePropagationContext, "recursive call to process_events" end # to avoid having a almost-method-global ensure block passed_recursive_check = true @application_exceptions = [] @emitted_events = [] @thread_pool.send :synchronize do @thread_pool.send(:ns_prune_pool) end # Gather new events and propagate them events_errors = nil next_steps = gather_propagation do events_errors = gather_errors do if caller_block yield caller_block = nil end if !quitting? || !garbage_collect([]) process_waiting_work log_timepoint "workers" gather_external_events log_timepoint "external_events" call_propagation_handlers log_timepoint "propagation_handlers" end end end propagation_info = propagate_events_and_errors( next_steps, events_errors, garbage_collect_pass: garbage_collect_pass ) if Roby.app.abort_on_exception? && !all_errors.fatal_errors.empty? raise Aborting.new(propagation_info.each_fatal_error.map(&:exception)) end propagation_info.framework_errors.concat(@application_exceptions) propagation_info ensure if passed_recursive_check process_pending_application_exceptions(raise_framework_errors: raise_framework_errors) end end |
#process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling. This mode is implemented using this method
When errors occur in this mode, the exceptions are raised directly. This is useful in tests as, this way, we are sure that the exception will not get overlooked
If multiple errors are raised in a single call (this is possible due to Roby’s error handling mechanisms), the method will raise SynchronousEventProcessingMultipleErrors to wrap all the exceptions into one.
1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 |
# File 'lib/roby/execution_engine.rb', line 1809 def process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block) Roby.warn_deprecated "#process_events_synchronous is deprecated, use the expect_execution harness instead" if @application_exceptions raise RecursivePropagationContext, "recursive call to process_events" end passed_recursive_check = true # to avoid having a almost-method-global ensure block @application_exceptions = [] # Save early for the benefit of the 'ensure' block current_scheduler_enabled = scheduler.enabled? if block_given? if seeds.empty? && initial_errors.empty? seeds = gather_propagation do initial_errors = gather_errors(&block) end else raise ArgumentError, "cannot provide both seeds/inital errors and a block" end end scheduler.enabled = enable_scheduler propagation_info = propagate_events_and_errors( seeds, initial_errors, garbage_collect_pass: false ) unless propagation_info.kill_tasks.empty? gc_initial_errors = nil gc_seeds = gather_propagation do gc_initial_errors = gather_errors do garbage_collect(propagation_info.kill_tasks) end end gc_errors = propagate_events_and_errors( gc_seeds, gc_initial_errors, garbage_collect_pass: false ) propagation_info.merge(gc_errors) end if raise_errors propagation_info = propagation_info.exceptions if propagation_info.size == 1 raise propagation_info.first elsif !propagation_info.empty? raise SynchronousEventProcessingMultipleErrors.new(propagation_info.map(&:exception)) end else propagation_info end rescue SynchronousEventProcessingMultipleErrors => e raise SynchronousEventProcessingMultipleErrors.new(e.errors + clear_application_exceptions) rescue Exception => e if passed_recursive_check application_exceptions = clear_application_exceptions if !application_exceptions.empty? raise SynchronousEventProcessingMultipleErrors.new(application_exceptions.map(&:first) + [e]) else raise e end else raise e end ensure if passed_recursive_check && @application_exceptions process_pending_application_exceptions end scheduler.enabled = current_scheduler_enabled end |
#process_once_blocks ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Dispatch #once_blocks to the other handler sets for further processing
821 822 823 824 825 826 827 828 829 830 |
# File 'lib/roby/execution_engine.rb', line 821 def process_once_blocks until once_blocks.empty? type, block = once_blocks.pop if type == :external_events external_events_handlers << block else propagation_handlers << block end end end |
#process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 |
# File 'lib/roby/execution_engine.rb', line 680 def process_pending_application_exceptions( application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception? ) # We don't aggregate exceptions, so report them all and raise one if display_exceptions? application_errors.each do |error, source| unless error.kind_of?(Interrupt) fatal "Application error in #{source}" Roby.log_exception_with_backtrace(error, self, :fatal) end end end error, source = application_errors.find do |error, _| raise_framework_errors || error.kind_of?(SignalException) end if error if Roby.app.display_all_threads_state_on_abort? fatal "State of all running threads:" Roby.log_all_threads_backtraces(self, :fatal) end raise error, "in #{source}: #{error.message}", error.backtrace end end |
#process_waiting_work ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler)
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 |
# File 'lib/roby/execution_engine.rb', line 1531 def process_waiting_work finished, not_finished = waiting_work.partition(&:complete?) unhandled_errors = finished.find_all do |work| work.rejected? && (work.respond_to?(:handled_error?) && !work.handled_error?) end unhandled_errors.each do |work| e = work.reason e.set_backtrace(e.backtrace) add_framework_error(e, work.to_s) if work.respond_to?(:error_handling_failure) && (e = work.error_handling_failure) e.set_backtrace(e.backtrace) add_framework_error(e, work.to_s) end end @waiting_work = not_finished finished end |
#promise(description: nil, executor: thread_pool, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a promise to execute the given block in a separate thread
Note that the returned value is a Promise. This means that callbacks added with #on_success or #rescue will be executed in the execution engine thread by default.
2755 2756 2757 |
# File 'lib/roby/execution_engine.rb', line 2755 def promise(description: nil, executor: thread_pool, &block) Promise.new(self, executor: executor, description: description, &block) end |
#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagate an initial set of event propagations and errors
1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 |
# File 'lib/roby/execution_engine.rb', line 1892 def propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) propagation_info = PropagationInfo.new events_errors = initial_errors.dup loop do log_timepoint_group "event_propagation_phase" do events_errors.concat(event_propagation_phase(next_steps, propagation_info)) end next_steps = gather_propagation do exception_propagation_errors, error_phase_results = nil log_timepoint_group "error_handling_phase" do exception_propagation_errors = gather_errors do error_phase_results = error_handling_phase(events_errors) end end add_exceptions_for_inhibition(error_phase_results.each_fatal_error) propagation_info.merge(error_phase_results) garbage_collection_errors = gather_errors do plan.generate_induced_errors(error_phase_results) if garbage_collect_pass garbage_collect(error_phase_results.kill_tasks) else [] end end events_errors = (exception_propagation_errors + garbage_collection_errors) log_timepoint "garbage_collect" end break if next_steps.empty? && events_errors.empty? end propagation_info end |
#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The core exception propagation algorithm
1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 |
# File 'lib/roby/execution_engine.rb', line 1259 def propagate_exception_in_plan(exceptions, &handler) propagation_graph = dependency_graph.reverse # Propagate the exceptions in the hierarchy handled_unhandled = [] exceptions.each do |exception, parents| origin = exception.origin if parents filtered_parents = parents.find_all { |t| t.depends_on?(origin) } if filtered_parents != parents warn "some parents specified for #{exception.exception}"\ "(#{exception.exception.class}) are actually not parents "\ "of #{origin}, they got filtered out" (parents - filtered_parents).each do |task| warn " #{task}" end if filtered_parents.empty? parents = propagation_graph.out_neighbours(origin) else parents = filtered_parents end end else parents = propagation_graph.out_neighbours(origin) end debug do debug "propagating exception " log_pp :debug, exception unless parents.empty? debug " constrained to parents" log_nest(2) do parents.each do |p| log_pp :debug, p end end end break end visitor = ExceptionPropagationVisitor.new( propagation_graph, exception, origin, parents, handler ) visitor.visit unhandled = visitor.unhandled_exceptions.inject { |a, b| a.merge(b) } handled = visitor.handled_exceptions.inject { |a, b| a.merge(b) } handled_unhandled << [handled, unhandled] end exceptions_handled_by = [] unhandled_exceptions = [] handled_unhandled.each do |handled, e| if e if e.handled = yield(e, plan) if handled handled_by = (handled.propagation_leafs.to_set << plan) exceptions_handled_by << [handled.merge(e), handled_by] else handled = e exceptions_handled_by << [e, [plan].to_set] end else affected_tasks = e.trace.vertices.to_set if handled affected_tasks -= handled.trace.vertices exceptions_handled_by << [handled, handled.propagation_leafs.to_set] end unhandled_exceptions << [e, affected_tasks] end else exceptions_handled_by << [handled, handled.propagation_leafs.to_set] end end debug do debug "#{unhandled_exceptions.size} unhandled exceptions remain" log_nest(2) do unhandled_exceptions.each do |e, affected_tasks| log_pp :debug, e debug "Affects #{affected_tasks.size} tasks" log_nest(2) do affected_tasks.each do |t| log_pp :debug, t end end end end break end [unhandled_exceptions, exceptions_handled_by] end |
#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions
1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 |
# File 'lib/roby/execution_engine.rb', line 1360 def propagate_exceptions(exceptions) if exceptions.empty? return [], [], [] end # Remove all exception that are not associated with a task exceptions, free_events_exceptions = exceptions.partition do |e, _| e.origin end # Normalize the free events exceptions free_events_exceptions = free_events_exceptions.map do |e, _| if e.exception.failed_generator.plan [e, Set[e.exception.failed_generator]] end end.compact debug "Filtering inhibited exceptions" exceptions = log_nest(2) do non_inhibited, = remove_inhibited_exceptions(exceptions) # Reset the trace for the real propagation non_inhibited.map do |e, _| _, propagate_through = exceptions.find { |original_e, _| original_e.exception == e.exception } e.reset_trace [e, propagate_through] end end debug "Propagating #{exceptions.size} non-inhibited exceptions" log_nest(2) do # Note that the first half of the method filtered the free # events exceptions out of 'exceptions' unhandled, handled = propagate_exception_in_plan(exceptions) do |e, object| object.handle_exception(e) end return unhandled, free_events_exceptions, handled end end |
#propagation_context(sources) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets the source_event and source_generator variables according to source. source is the from argument of #add_event_propagation
728 729 730 731 732 733 734 735 736 |
# File 'lib/roby/execution_engine.rb', line 728 def propagation_context(sources) current_sources = @propagation_sources raise InternalError, "not in a gathering context in #propagation_context" unless in_propagation_context? @propagation_sources = sources yield ensure @propagation_sources = current_sources end |
#propagation_source_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of events extracted from #sources
517 518 519 520 521 522 523 524 525 |
# File 'lib/roby/execution_engine.rb', line 517 def propagation_source_events result = Set.new for ev in @propagation_sources if ev.respond_to?(:generator) result << ev end end result end |
#propagation_source_generators ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of generators extracted from #sources
528 529 530 531 532 533 534 535 536 537 538 |
# File 'lib/roby/execution_engine.rb', line 528 def propagation_source_generators result = Set.new for ev in @propagation_sources result << if ev.respond_to?(:generator) ev.generator else ev end end result end |
#queue_forward(sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Queue a forwarding to be propagated
748 749 750 |
# File 'lib/roby/execution_engine.rb', line 748 def queue_forward(sources, target, context, timespec) add_event_propagation(true, sources, target, context, timespec) end |
#queue_signal(sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Queue a signal to be propagated
743 744 745 |
# File 'lib/roby/execution_engine.rb', line 743 def queue_signal(sources, target, context, timespec) add_event_propagation(false, sources, target, context, timespec) end |
#quit ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Make control quit properly
2534 2535 2536 |
# File 'lib/roby/execution_engine.rb', line 2534 def quit @quit = 1 end |
#quitting? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the control thread is currently quitting
2520 2521 2522 |
# File 'lib/roby/execution_engine.rb', line 2520 def quitting? @quit > 0 end |
#refresh_relations ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Refresh the value of cached relations
Some often-used relations are cached at #initialize, such as #dependency_graph and #precedence_graph. Call this when the actual graph objects have changed on the plan
134 135 136 137 138 139 |
# File 'lib/roby/execution_engine.rb', line 134 def refresh_relations @dependency_graph = plan.task_relation_graph_for(TaskStructure::Dependency) @precedence_graph = plan.event_relation_graph_for(EventStructure::Precedence) @signal_graph = plan.event_relation_graph_for(EventStructure::Signal) @forward_graph = plan.event_relation_graph_for(EventStructure::Forwarding) end |
#remove_at_cycle_end(handler_id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Removes a handler added by #at_cycle_end
2160 2161 2162 |
# File 'lib/roby/execution_engine.rb', line 2160 def remove_at_cycle_end(handler_id) handler_id.dispose end |
#remove_exception_listener(handler) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Removes an exception listener registered with #on_exception
2698 2699 2700 |
# File 'lib/roby/execution_engine.rb', line 2698 def remove_exception_listener(handler) handler.dispose if handler.respond_to?(:dispose) end |
#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Process the given exceptions to remove the ones that are currently filtered by the plan repairs
The returned exceptions are propagated, i.e. their #trace method contains all the tasks that are affected by the absence of a handling mechanism
1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 |
# File 'lib/roby/execution_engine.rb', line 1410 def remove_inhibited_exceptions(exceptions) exceptions = exceptions.find_all do |execution_exception, _| execution_exception.origin.plan end propagate_exception_in_plan(exceptions) do |e, object| if has_pending_exception_matching?(e, object) true elsif object.respond_to?(:handles_error?) object.handles_error?(e) end end end |
#remove_periodic_handler(id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Removes a periodic handler defined by #every. id is the value returned by #every.
2184 2185 2186 2187 |
# File 'lib/roby/execution_engine.rb', line 2184 def remove_periodic_handler(id) id.dispose nil end |
#reset ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Make a quit EE ready for reuse
2544 2545 2546 |
# File 'lib/roby/execution_engine.rb', line 2544 def reset @quit = 0 end |
#reset_thread_pool ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2634 2635 2636 2637 |
# File 'lib/roby/execution_engine.rb', line 2634 def reset_thread_pool @thread_pool&.shutdown @thread_pool = Concurrent::CachedThreadPool.new(idletime: 10) end |
#run(cycle: 0.1) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Main event loop. Valid options are
- cycle
-
the cycle duration in seconds (default: 0.1)
2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 |
# File 'lib/roby/execution_engine.rb', line 2239 def run(cycle: 0.1) if running? raise AlreadyRunning, "#run has already been called" end self.running = true @allow_propagation = false @waiting_work = Concurrent::Array.new @thread = Thread.current @thread.name = "MAIN" @cycle_length = cycle trap("INT") do interrupt end event_loop ensure self.running = false @thread = nil waiting_work.delete_if do |w| next(true) if w.complete? # rubocop:disable Lint/HandleExceptions begin w.fail ExecutionQuitError Roby.warn "forcefully terminated #{w} on quit" rescue Concurrent::MultipleAssignmentError # Race condition: something completed the promise while # we were trying to make it fail end # rubocop:enable Lint/HandleExceptions true end finalizers.each do |blk| begin blk.call rescue Exception => e Roby.warn "finalizer #{blk} failed" Roby.log_exception_with_backtrace(e, Roby, :warn) end end @quit = 0 @allow_propagation = true trap("INT", "DEFAULT") end |
#shutdown ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2629 2630 2631 2632 |
# File 'lib/roby/execution_engine.rb', line 2629 def shutdown killall thread_pool.shutdown end |
#start_new_cycle(time = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Set the cycle_start attribute and increment cycle_index
This is only used for testing purposes
2510 2511 2512 2513 |
# File 'lib/roby/execution_engine.rb', line 2510 def start_new_cycle(time = Time.now) @cycle_start = time @cycle_index += 1 end |
#unmark_finished_missions_and_permanent_tasks ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 |
# File 'lib/roby/execution_engine.rb', line 1947 def unmark_finished_missions_and_permanent_tasks to_unmark = plan.task_index.by_predicate[:finished?] | plan.task_index.by_predicate[:failed?] finished_missions = (plan.mission_tasks & to_unmark) # Remove all missions that are finished for finished_mission in finished_missions unless finished_mission.being_repaired? plan.unmark_mission_task(finished_mission) end end finished_permanent = (plan.permanent_tasks & to_unmark) for finished_permanent in (plan.permanent_tasks & to_unmark) unless finished_permanent.being_repaired? plan.unmark_permanent_task(finished_permanent) end end end |
#unreachable_event(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by EventGenerator when an event became unreachable
574 575 576 |
# File 'lib/roby/execution_engine.rb', line 574 def unreachable_event(event) delayed_events.delete_if { |_, _, _, signalled, _| signalled == event } end |
#wait_one_cycle ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Blocks until at least once execution cycle has been done
2098 2099 2100 2101 2102 2103 2104 2105 |
# File 'lib/roby/execution_engine.rb', line 2098 def wait_one_cycle current_cycle = execute { cycle_index } while current_cycle == execute { cycle_index } raise ExecutionQuitError unless running? sleep(cycle_length) end end |
#wait_until(ev) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops the current thread until the given even is emitted. If the event becomes unreachable, an UnreachableEvent exception is raised.
2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 |
# File 'lib/roby/execution_engine.rb', line 2602 def wait_until(ev) if inside_control? raise ThreadMismatch, "cannot use #wait_until in execution threads" end ivar = Concurrent::IVar.new result = nil once(sync: ivar) do if ev.unreachable? ivar.fail(UnreachableEvent.new(ev, ev.unreachability_reason)) else ev.if_unreachable(cancel_at_emission: true) do |reason, event| ivar.fail(UnreachableEvent.new(event, reason)) unless ivar.complete? end ev.once do |ev| ivar.set(result) unless ivar.complete? end begin result = yield if block_given? rescue Exception => e ivar.fail(e) end end end ivar.value! end |