Class: Roby::ExecutionEngine Private
- Extended by:
- Logger::Hierarchy, PropagationHandlerMethods
- Includes:
- Logger::Hierarchy, DRoby::EventLogging, PropagationHandlerMethods
- Defined in:
- lib/roby/execution_engine.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
The core execution algorithm
It is in charge of handling event and exception propagation, as well as running cleanup processes (e.g. garbage collection).
The main method is #process_events. When executing a Roby application, it is called periodically by #event_loop.
In addition, there is a special “synchronous” propagation mode that is used by Roby::EventGenerator#call and Roby::EventGenerator#emit. This mode is used when the event code is not executed within an engine, but from an imperative script, as in unit tests.
Defined Under Namespace
Modules: PropagationHandlerMethods Classes: AlreadyRunning, ExceptionPropagationVisitor, JoinAllWaitingWorkTimeout, NotPropagationContext, PollBlockDefinition, PropagationInfo, RecursivePropagationContext
Constant Summary collapse
- PENDING_PROPAGATION_FORWARD =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
1- PENDING_PROPAGATION_SIGNAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
2- SLEEP_MIN_TIME =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Do not sleep or call Thread#pass if there is less that this much time left in the cycle
0.01
- INTERRUPT_FORCE_EXIT_DEAD_ZONE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
How many seconds between two Interrupt before the execution engine’s loop can forcefully quit
10- EXCEPTION_NONFATAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for non-fatal, unhandled exceptions
:nonfatal- EXCEPTION_FATAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for fatal, unhandled exceptions
:fatal- EXCEPTION_HANDLED =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for handled exceptions
:handled- EXCEPTION_FREE_EVENT =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for free event exceptions
:free_event
Instance Attribute Summary collapse
-
#additional_errors ⇒ Object
readonly
private
Used during exception propagation to inject new errors in the process.
-
#application_exceptions ⇒ Object
readonly
private
The set of errors which have been generated outside of the plan’s control.
-
#at_cycle_end_handlers ⇒ Object
readonly
private
A set of blocks that are called at each cycle end.
-
#control ⇒ Object
private
The DecisionControl object associated with this engine.
-
#cycle_index ⇒ Object
readonly
private
The number of this cycle since the beginning.
-
#cycle_length ⇒ Object
readonly
private
The cycle length in seconds.
-
#cycle_start ⇒ Object
readonly
private
The starting Time of this cycle.
-
#delayed_events ⇒ Object
readonly
private
The set of pending delayed events.
-
#dependency_graph ⇒ Object
readonly
private
Cached graph object for TaskStructure::Dependency.
-
#disabled_handlers ⇒ Array<PollBlockDefinition>
readonly
private
Poll blocks that have been disabled because they raised an exception.
-
#emitted_events ⇒ Array<Event>
readonly
private
The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop).
-
#event_logger ⇒ Object
private
The underlying DRoby::EventLogger.
-
#event_ordering ⇒ Object
readonly
private
The topological ordering of events w.r.t.
-
#event_priorities ⇒ Object
readonly
private
The event => index hash which give the propagation priority for each event.
-
#exception_listeners ⇒ Array<#call>
readonly
private
The blocks that are currently listening to exceptions.
-
#finalizers ⇒ Object
readonly
private
A set of proc objects which are to be called when the execution engine quits.
-
#forward_graph ⇒ Object
readonly
private
Cached graph object for EventStructure::Forward.
-
#last_stop_count ⇒ Object
readonly
private
:nodoc:.
-
#once_blocks ⇒ Queue
readonly
private
Thread-safe queue to push work to the execution engine.
-
#plan ⇒ Object
private
The Plan this engine is acting on.
-
#precedence_graph ⇒ Object
readonly
private
Cached graph object for EventStructure::Precedence.
-
#process_every ⇒ Object
readonly
private
A set of blocks which are called every cycle.
-
#propagation_id ⇒ Object
readonly
private
A numeric ID giving the count of the current propagation cycle.
-
#propagation_sources ⇒ Object
readonly
private
The set of source events for the current propagation action.
-
#scheduler ⇒ Object
private
The scheduler is the object which handles non-generic parts of the propagation cycle.
-
#signal_graph ⇒ Object
readonly
private
Cached graph object for EventStructure::Signal.
-
#thread ⇒ Object
private
The execution thread if there is one running.
-
#thread_pool ⇒ Concurrent::CachedThreadPool
readonly
private
A thread pool on which async work should be executed.
-
#waiting_work ⇒ Array<#fail,#complete?>
readonly
private
A list of threaded objects waiting for the control thread.
Attributes included from PropagationHandlerMethods
#external_events_handlers, #propagation_handlers
Class Method Summary collapse
-
.call_every(plan) ⇒ Object
private
Calls the periodic blocks which should be called.
-
.make_delay(timeref, source, target, timespec) ⇒ Object
private
Returns a Time object which represents the absolute point in time referenced by
timespecin the context of delaying a propagation betweensourceandtarget. -
.validate_timespec(timespec) ⇒ Object
private
Validates
timespecas a delay specification.
Instance Method Summary collapse
-
#add_error(e, propagate_through: nil) ⇒ Object
private
Register a LocalizedError for future propagation.
-
#add_event_delay(time, is_forward, source, target, context) ⇒ Object
private
Adds a propagation step to be performed when the current time is greater than
time. -
#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object
private
Adds a propagation to the next propagation step: it registers a propagation step to be performed between
sourceandtargetwith the givencontext. -
#add_exceptions_for_inhibition(fatal_errors) ⇒ Object
private
Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles.
-
#add_framework_error(error, source) ⇒ Object
private
Registers the given error and a description of its source in the list of application/framework errors.
-
#at_cycle_end(description: 'at_cycle_end', **options) {|plan| ... } ⇒ Object
private
Adds a block to be called at the end of each execution cycle.
-
#call_poll_blocks(blocks, late = false) ⇒ Object
private
Helper that calls the propagation handlers in
propagation_handlers(which are expected to be instances of PollBlockDefinition) and handles the errors according of each handler’s policy. - #call_propagation_handlers ⇒ Object private
-
#clear ⇒ Object
private
Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.
- #clear_application_exceptions ⇒ Object private
-
#compute_errors(events_errors) ⇒ PropagationInfo
private
Compute the set of fatal errors in the current execution state.
-
#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object
private
Compute the set of unhandled fatal exceptions.
-
#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
private
Called at each cycle end.
-
#delayed(delay, description: 'delayed block', **options, &block) ⇒ Object
private
Schedules
blockto be called once afterdelayseconds passed, in the propagation context. -
#display_exceptions=(flag) ⇒ Object
private
Controls whether this engine should indiscriminately display all fatal exceptions.
-
#display_exceptions? ⇒ Boolean
private
whether this engine should indiscriminately display all fatal exceptions.
-
#error_handling_phase(events_errors) ⇒ Object
private
Compute errors in plan and handle the results.
-
#event_loop ⇒ Object
private
The main event loop.
-
#event_propagation_phase(initial_events, propagation_info) ⇒ Object
private
Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block.
-
#event_propagation_step(current_step, propagation_info) ⇒ Object
private
Propagate one step.
-
#every(duration, description: 'periodic handler', **options, &block) ⇒ Object
private
Call
blockeverydurationseconds. -
#execute(catch: [], type: :external_events) ⇒ Object
private
Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context.
-
#execute_delayed_events ⇒ Object
private
Adds the events in
delayed_eventswhose time has passed into the propagation. -
#finalized_event(event) ⇒ Object
private
Called by #plan when an event has been finalized.
-
#finalized_task(task) ⇒ Object
private
Called by #plan when a task has been finalized.
-
#force_quit ⇒ Object
private
Force quitting, without cleaning up.
-
#forced_exit? ⇒ Boolean
private
True if the control thread is currently quitting.
-
#garbage_collect(force_on = nil) ⇒ Boolean
private
Kills and removes all unneeded tasks.
-
#gather_errors ⇒ Array<ExecutionException>
private
Executes the given block while gathering errors, and returns the errors that have been declared with #add_error.
-
#gather_external_events ⇒ Object
private
Gather the events that come out of this plan manager.
-
#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object
private
Yields to the block and registers any raised exception using #add_framework_error.
-
#gather_propagation(initial_set = Hash.new) ⇒ Object
private
Sets up a propagation context, yielding the block in it.
- #gathering? ⇒ Boolean private
- #gathering_errors? ⇒ Boolean private
-
#has_pending_exception_matching?(e, object) ⇒ Boolean
private
Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object.
-
#has_pending_forward?(from, to, expected_context) ⇒ Boolean
private
Whether a forward matching this signature is currently pending.
-
#has_pending_signal?(from, to, expected_context) ⇒ Boolean
private
Whether a signal matching this signature is currently pending.
- #has_propagation_for?(target) ⇒ Boolean private
-
#has_queued_events? ⇒ Boolean
private
Returns true if some events are queued.
-
#has_waiting_work? ⇒ Boolean
private
Whether this EE has asynchronous waiting work waiting to be processed.
-
#in_propagation_context? ⇒ Boolean
private
True if we are within a propagation context (i.e. within event processing).
-
#inhibited_exception?(exception) ⇒ Boolean
private
Query whether the given exception is inhibited in this plan.
-
#initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) ⇒ ExecutionEngine
constructor
private
Create an execution engine acting on
plan, usingcontrolas the decision control object. -
#inside_control? ⇒ Boolean
private
True if the current thread is the execution thread of this engine.
- #issue_quit_progression_warning(remaining) ⇒ Object private
-
#join_all_waiting_work(timeout: nil) ⇒ Object
private
Waits for all obligations in #waiting_work to finish.
-
#killall ⇒ Object
private
Kill all tasks that are currently running in the plan.
-
#next_event(pending) ⇒ Object
private
call-seq: next_event(pending) => event, propagation_info.
-
#notify_about_error_handling_results(errors) ⇒ Object
private
Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions.
-
#notify_exception(kind, error, involved_objects) ⇒ Object
private
Call to notify the listeners registered with #on_exception of the occurence of an exception.
-
#on_exception(description: 'exception listener', on_error: :disable) {|kind, error, tasks| ... } ⇒ Object
private
Registers a callback that will be called when exceptions are propagated in the plan.
-
#once(sync: nil, description: 'once block', type: :external_events, **options, &block) ⇒ Object
private
Schedules
blockto be called at the beginning of the next execution cycle, in propagation context. -
#outside_control? ⇒ Boolean
private
True if the current thread is not the execution thread of this engine, or if there is not control thread.
-
#prepare_propagation(target, is_forward, info) ⇒ Object
private
call-seq: prepare_propagation(target, is_forward, info) => source_events, source_generators, context prepare_propagation(target, is_forward, info) => nil.
-
#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo
private
The inside part of the event loop.
-
#process_events_synchronous(seeds = Hash.new, initial_errors = Array.new, enable_scheduler: false, raise_errors: true) ⇒ Object
private
Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling.
-
#process_once_blocks ⇒ Object
private
Dispatch #once_blocks to the other handler sets for further processing.
- #process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object private
-
#process_waiting_work ⇒ Object
private
Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler).
-
#promise(description: nil, executor: thread_pool, &block) ⇒ Object
private
Create a promise to execute the given block in a separate thread.
-
#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo
private
Propagate an initial set of event propagations and errors.
-
#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>
private
The core exception propagation algorithm.
-
#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>
private
Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions.
-
#propagation_context(sources) ⇒ Object
private
Sets the source_event and source_generator variables according to
source. -
#propagation_source_events ⇒ Object
private
The set of events extracted from #sources.
-
#propagation_source_generators ⇒ Object
private
The set of generators extracted from #sources.
-
#queue_forward(sources, target, context, timespec) ⇒ Object
private
Queue a forwarding to be propagated.
-
#queue_signal(sources, target, context, timespec) ⇒ Object
private
Queue a signal to be propagated.
-
#quit ⇒ Object
private
Make control quit properly.
-
#quitting? ⇒ Boolean
private
True if the control thread is currently quitting.
-
#refresh_relations ⇒ Object
private
Refresh the value of cached relations.
-
#remove_at_cycle_end(handler_id) ⇒ Object
private
Removes a handler added by #at_cycle_end.
-
#remove_exception_listener(handler) ⇒ void
private
Removes an exception listener registered with #on_exception.
-
#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>
private
Process the given exceptions to remove the ones that are currently filtered by the plan repairs.
-
#remove_periodic_handler(id) ⇒ Object
private
Removes a periodic handler defined by #every.
- #remove_propagation_handler(id) ⇒ Object private
-
#reset ⇒ Object
private
Make a quit EE ready for reuse.
- #reset_thread_pool ⇒ Object private
-
#run(cycle: 0.1) ⇒ Object
private
Main event loop.
- #shutdown ⇒ Object private
-
#start_new_cycle(time = Time.now) ⇒ Object
private
Set the cycle_start attribute and increment cycle_index.
- #unmark_finished_missions_and_permanent_tasks ⇒ Object private
-
#unreachable_event(event) ⇒ Object
private
Called by EventGenerator when an event became unreachable.
-
#wait_one_cycle ⇒ Object
private
Blocks until at least once execution cycle has been done.
-
#wait_until(ev) ⇒ Object
private
Stops the current thread until the given even is emitted.
Methods included from PropagationHandlerMethods
add_propagation_handler, at_cycle_begin, create_propagation_handler, each_cycle
Methods included from DRoby::EventLogging
#log, #log_flush_cycle, #log_queue_size, #log_timepoint, #log_timepoint_group, #log_timepoint_group_end, #log_timepoint_group_start
Constructor Details
#initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) ⇒ ExecutionEngine
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create an execution engine acting on plan, using control as the decision control object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/roby/execution_engine.rb', line 80 def initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) @plan = plan @event_logger = event_logger @use_oob_gc = ExecutionEngine.use_oob_gc? @control = control @scheduler = Schedulers::Null.new(plan) reset_thread_pool @thread = Thread.current @propagation = nil @propagation_id = 0 @propagation_exceptions = nil @application_exceptions = nil @delayed_events = [] @event_ordering = Array.new @event_priorities = Hash.new @propagation_handlers = [] @external_events_handlers = [] @at_cycle_end_handlers = Array.new @process_every = Array.new @waiting_work = Concurrent::Array.new @emitted_events = Array.new @disabled_handlers = Set.new @exception_listeners = Array.new @worker_threads_mtx = Mutex.new @worker_threads = Array.new @once_blocks = Queue.new @pending_exceptions = Hash.new each_cycle(&ExecutionEngine.method(:call_every)) @quit = 0 @allow_propagation = true @cycle_index = 0 @cycle_start = Time.now @cycle_length = 0.1 @last_stop_count = 0 @finalizers = [] @gc_warning = true refresh_relations self.display_exceptions = true end |
Instance Attribute Details
#additional_errors ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Used during exception propagation to inject new errors in the process
It shall not be accessed directly. Instead, Plan#add_error should be called
1391 1392 1393 |
# File 'lib/roby/execution_engine.rb', line 1391 def additional_errors @additional_errors end |
#application_exceptions ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of errors which have been generated outside of the plan’s control. For now, those errors cause the whole controller to shut down.
1377 1378 1379 |
# File 'lib/roby/execution_engine.rb', line 1377 def application_exceptions @application_exceptions end |
#at_cycle_end_handlers ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of blocks that are called at each cycle end
2030 2031 2032 |
# File 'lib/roby/execution_engine.rb', line 2030 def at_cycle_end_handlers @at_cycle_end_handlers end |
#control ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The DecisionControl object associated with this engine
180 181 182 |
# File 'lib/roby/execution_engine.rb', line 180 def control @control end |
#cycle_index ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The number of this cycle since the beginning
2090 2091 2092 |
# File 'lib/roby/execution_engine.rb', line 2090 def cycle_index @cycle_index end |
#cycle_length ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The cycle length in seconds
2084 2085 2086 |
# File 'lib/roby/execution_engine.rb', line 2084 def cycle_length @cycle_length end |
#cycle_start ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The starting Time of this cycle
2087 2088 2089 |
# File 'lib/roby/execution_engine.rb', line 2087 def cycle_start @cycle_start end |
#delayed_events ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of pending delayed events. This is an array of the form
[[time, is_forward, source, target, context], ...]
See #add_event_delay for more information
485 486 487 |
# File 'lib/roby/execution_engine.rb', line 485 def delayed_events @delayed_events end |
#dependency_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for TaskStructure::Dependency
This is here for performance reasons, to avoid resolving the same graph over and over
169 170 171 |
# File 'lib/roby/execution_engine.rb', line 169 def dependency_graph @dependency_graph end |
#disabled_handlers ⇒ Array<PollBlockDefinition> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Poll blocks that have been disabled because they raised an exception
360 361 362 |
# File 'lib/roby/execution_engine.rb', line 360 def disabled_handlers @disabled_handlers end |
#emitted_events ⇒ Array<Event> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop)
187 188 189 |
# File 'lib/roby/execution_engine.rb', line 187 def emitted_events @emitted_events end |
#event_logger ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The underlying DRoby::EventLogger
It is usually the same than the #plan‘s. Pass a DRoby::NullEventLogger at construction time to disable logging of execution events.
178 179 180 |
# File 'lib/roby/execution_engine.rb', line 178 def event_logger @event_logger end |
#event_ordering ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The topological ordering of events w.r.t. the Precedence relation. This gets updated on-demand when the event relations change.
941 942 943 |
# File 'lib/roby/execution_engine.rb', line 941 def event_ordering @event_ordering end |
#event_priorities ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The event => index hash which give the propagation priority for each event
944 945 946 |
# File 'lib/roby/execution_engine.rb', line 944 def event_priorities @event_priorities end |
#exception_listeners ⇒ Array<#call> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The blocks that are currently listening to exceptions
190 191 192 |
# File 'lib/roby/execution_engine.rb', line 190 def exception_listeners @exception_listeners end |
#finalizers ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of proc objects which are to be called when the execution engine quits.
2379 2380 2381 |
# File 'lib/roby/execution_engine.rb', line 2379 def finalizers @finalizers end |
#forward_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for EventStructure::Forward
This is here for performance reasons, to avoid resolving the same graph over and over
163 164 165 |
# File 'lib/roby/execution_engine.rb', line 163 def forward_graph @forward_graph end |
#last_stop_count ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
:nodoc:
2172 2173 2174 |
# File 'lib/roby/execution_engine.rb', line 2172 def last_stop_count @last_stop_count end |
#once_blocks ⇒ Queue (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Thread-safe queue to push work to the execution engine
Do not access directly, use #once instead
198 199 200 |
# File 'lib/roby/execution_engine.rb', line 198 def once_blocks @once_blocks end |
#plan ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The Plan this engine is acting on
172 173 174 |
# File 'lib/roby/execution_engine.rb', line 172 def plan @plan end |
#precedence_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for EventStructure::Precedence
This is here for performance reasons, to avoid resolving the same graph over and over
151 152 153 |
# File 'lib/roby/execution_engine.rb', line 151 def precedence_graph @precedence_graph end |
#process_every ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of blocks which are called every cycle
2052 2053 2054 |
# File 'lib/roby/execution_engine.rb', line 2052 def process_every @process_every end |
#propagation_id ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A numeric ID giving the count of the current propagation cycle
182 183 184 |
# File 'lib/roby/execution_engine.rb', line 182 def propagation_id @propagation_id end |
#propagation_sources ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of source events for the current propagation action. This is a mix of EventGenerator and Event objects.
455 456 457 |
# File 'lib/roby/execution_engine.rb', line 455 def propagation_sources @propagation_sources end |
#scheduler ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The scheduler is the object which handles non-generic parts of the propagation cycle. For now, its #initial_events method is called at the beginning of each propagation cycle and can call or emit a set of events.
See Schedulers::Basic
431 432 433 |
# File 'lib/roby/execution_engine.rb', line 431 def scheduler @scheduler end |
#signal_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for EventStructure::Signal
This is here for performance reasons, to avoid resolving the same graph over and over
157 158 159 |
# File 'lib/roby/execution_engine.rb', line 157 def signal_graph @signal_graph end |
#thread ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The execution thread if there is one running
2079 2080 2081 |
# File 'lib/roby/execution_engine.rb', line 2079 def thread @thread end |
#thread_pool ⇒ Concurrent::CachedThreadPool (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A thread pool on which async work should be executed
145 146 147 |
# File 'lib/roby/execution_engine.rb', line 145 def thread_pool @thread_pool end |
#waiting_work ⇒ Array<#fail,#complete?> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A list of threaded objects waiting for the control thread
Objects registered here will be notified them by calling #fail when it quits. In addition, #join_all_waiting_work will wait for all pending jobs to finish.
Note that all Concurrent::Obligation subclasses fit the bill
2027 2028 2029 |
# File 'lib/roby/execution_engine.rb', line 2027 def waiting_work @waiting_work end |
Class Method Details
.call_every(plan) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Calls the periodic blocks which should be called
2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 |
# File 'lib/roby/execution_engine.rb', line 2000 def self.call_every(plan) # :nodoc: engine = plan.execution_engine now = engine.cycle_start length = engine.cycle_length engine.process_every.map! do |block, last_call, duration| # Check if the nearest timepoint is the beginning of # this cycle or of the next cycle if !last_call || (duration - (now - last_call)) < length / 2 if !block.call(engine, engine.plan) next end last_call = now end [block, last_call, duration] end.compact! end |
.make_delay(timeref, source, target, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a Time object which represents the absolute point in time referenced by timespec in the context of delaying a propagation between source and target.
See validate_timespec for more information
931 932 933 934 935 936 937 |
# File 'lib/roby/execution_engine.rb', line 931 def self.make_delay(timeref, source, target, timespec) if delay = timespec[:delay] then timeref + delay elsif at = timespec[:at] then at else raise ArgumentError, "invalid timespec #{timespec}" end end |
.validate_timespec(timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Validates timespec as a delay specification. A valid delay specification is either nil or a hash, in which case two forms are possible:
at: absolute_time
delay: number
920 921 922 923 924 |
# File 'lib/roby/execution_engine.rb', line 920 def self.validate_timespec(timespec) if timespec timespec = timespec, [:delay, :at] end end |
Instance Method Details
#add_error(e, propagate_through: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a LocalizedError for future propagation
This method must be called in a error-gathering context (i.e. #gather_error.
582 583 584 585 586 587 588 589 590 |
# File 'lib/roby/execution_engine.rb', line 582 def add_error(e, propagate_through: nil) plan_exception = e.to_execution_exception if @propagation_exceptions @propagation_exceptions << [plan_exception, propagate_through] else Roby.log_exception_with_backtrace(e, self, :fatal) raise NotPropagationContext, "#add_error called outside an error-gathering context (#add_error)" end end |
#add_event_delay(time, is_forward, source, target, context) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a propagation step to be performed when the current time is greater than time. The propagation step is a signal if is_forward is false and a forward otherwise.
This method should not be called directly. Use #add_event_propagation with the appropriate timespec argument.
See also #delayed_events and #execute_delayed_events
495 496 497 |
# File 'lib/roby/execution_engine.rb', line 495 def add_event_delay(time, is_forward, source, target, context) delayed_events << [time, is_forward, source, target, context] end |
#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a propagation to the next propagation step: it registers a propagation step to be performed between source and target with the given context. If is_forward is true, the propagation will be a forwarding, otherwise it is a signal.
If timespec is not nil, it defines a delay to be applied before calling the target event.
See #gather_propagation
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 |
# File 'lib/roby/execution_engine.rb', line 700 def add_event_propagation(is_forward, sources, target, context, timespec) if target.plan != plan raise Roby::EventNotExecutable.new(target), "#{target} not in executed plan" end target.pending(sources.find_all { |ev| ev.kind_of?(Event) }) @propagation_step_id += 1 target_info = (@propagation[target] ||= [@propagation_step_id, [], []]) step = target_info[is_forward ? PENDING_PROPAGATION_FORWARD : PENDING_PROPAGATION_SIGNAL] if sources.empty? step << nil << context << timespec else sources.each do |ev| step << ev << context << timespec end end end |
#add_exceptions_for_inhibition(fatal_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles
1827 1828 1829 1830 1831 1832 1833 1834 |
# File 'lib/roby/execution_engine.rb', line 1827 def add_exceptions_for_inhibition(fatal_errors) fatal_errors.each do |exception, involved_tasks| involved_tasks.each do |t| (@pending_exceptions[t] ||= Set.new) << [exception.exception.class, exception.origin] end end end |
#add_framework_error(error, source) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers the given error and a description of its source in the list of application/framework errors
It must be called within an exception-gathering context, that is either within #process_events, or within #gather_framework_errors
These errors will terminate the event loop
653 654 655 656 657 658 659 660 |
# File 'lib/roby/execution_engine.rb', line 653 def add_framework_error(error, source) if @application_exceptions @application_exceptions << [error, source] else Roby.log_exception_with_backtrace(error, self, :fatal) raise NotPropagationContext, "#add_framework_error called outside an exception-gathering context" end end |
#at_cycle_end(description: 'at_cycle_end', **options) {|plan| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a block to be called at the end of each execution cycle
2038 2039 2040 2041 2042 |
# File 'lib/roby/execution_engine.rb', line 2038 def at_cycle_end(description: 'at_cycle_end', **, &block) handler = PollBlockDefinition.new(description, block, **) at_cycle_end_handlers << handler handler.object_id end |
#call_poll_blocks(blocks, late = false) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Helper that calls the propagation handlers in propagation_handlers (which are expected to be instances of PollBlockDefinition) and handles the errors according of each handler’s policy
740 741 742 743 744 745 746 747 748 749 750 751 752 753 |
# File 'lib/roby/execution_engine.rb', line 740 def call_poll_blocks(blocks, late = false) blocks.delete_if do |handler| if handler.disabled? || (handler.late? ^ late) next end log_timepoint_group handler.description do if !handler.call(self, plan) handler.disabled = true end end handler.once? end end |
#call_propagation_handlers ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 |
# File 'lib/roby/execution_engine.rb', line 776 def call_propagation_handlers process_once_blocks if scheduler.enabled? gather_framework_errors('scheduler') do scheduler.initial_events log_timepoint 'scheduler' end end call_poll_blocks(self.class.propagation_handlers, false) call_poll_blocks(self.propagation_handlers, false) if !has_queued_events? call_poll_blocks(self.class.propagation_handlers, true) call_poll_blocks(self.propagation_handlers, true) end end |
#clear ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.
Returns nil if the plan is cleared, and the set of remaining tasks otherwise. Note that quaranteened tasks are not counted as remaining, as it is not possible for the execution engine to stop them.
2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 |
# File 'lib/roby/execution_engine.rb', line 2180 def clear plan.mission_tasks.dup.each { |t| plan.unmark_mission_task(t) } plan.permanent_tasks.dup.each { |t| plan.unmark_permanent_task(t) } plan.permanent_events.dup.each { |t| plan.unmark_permanent_event(t) } plan.force_gc.merge( plan.tasks ) quaranteened_subplan = plan.compute_useful_tasks(plan.quarantined_tasks) remaining = plan.tasks - quaranteened_subplan @pending_exceptions.clear if remaining.empty? # Have to call #garbage_collect one more to make # sure that unneeded events are removed as well garbage_collect # Done cleaning the tasks, clear the remains plan.transactions.each do |trsc| trsc.discard_transaction if trsc.self_owned? end plan.clear emitted_events.clear return end remaining end |
#clear_application_exceptions ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
1378 1379 1380 1381 1382 1383 1384 1385 |
# File 'lib/roby/execution_engine.rb', line 1378 def clear_application_exceptions if !@application_exceptions raise RecursivePropagationContext, "unbalanced call to #clear_application_exceptions" end result, @application_exceptions = @application_exceptions, nil result end |
#compute_errors(events_errors) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute the set of fatal errors in the current execution state
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 |
# File 'lib/roby/execution_engine.rb', line 1398 def compute_errors(events_errors) # Generate exceptions from task structure structure_errors = plan.check_structure log_timepoint 'structure_check' # Propagate the errors. Note that the plan repairs are taken into # account in ExecutionEngine.propagate_exceptions directly. We keep # event and structure errors separate since in the first case there # is not two-stage handling (all errors that have not been handled # are fatal), and in the second case we call #check_structure # again to errors that are remaining after the call to the exception # handlers events_errors, free_events_errors, events_handled = propagate_exceptions(events_errors) _, structure_handled = propagate_exceptions(structure_errors) log_timepoint 'exception_propagation' # Get the remaining problems in the plan structure, and act on it structure_errors, structure_inhibited = remove_inhibited_exceptions(plan.check_structure) # Partition them by fatal/nonfatal fatal_errors, nonfatal_errors = Array.new, Array.new (structure_errors + events_errors).each do |e, involved_tasks| if e.fatal? fatal_errors << [e, involved_tasks] else nonfatal_errors << [e, involved_tasks] end end kill_tasks = compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors).to_set handled_errors = structure_handled + events_handled debug "#{fatal_errors.size} fatal errors found and #{free_events_errors.size} errors involving free events" debug "the fatal errors involve #{kill_tasks.size} non-finalized tasks" return PropagationInfo.new(Set.new, Set.new, kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors, structure_inhibited) end |
#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute the set of unhandled fatal exceptions
856 857 858 859 860 861 862 863 |
# File 'lib/roby/execution_engine.rb', line 856 def compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) kill_tasks = fatal_errors.inject(Set.new) do |tasks, (exception, affected_tasks)| tasks.merge(affected_tasks) end # Tasks might have been finalized during exception handling, filter # those out kill_tasks.find_all(&:plan) end |
#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called at each cycle end
2396 2397 2398 2399 2400 |
# File 'lib/roby/execution_engine.rb', line 2396 def cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) gather_framework_errors("#cycle_end", raise_caught_exceptions: raise_framework_errors) do call_poll_blocks(at_cycle_end_handlers) end end |
#delayed(delay, description: 'delayed block', **options, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules block to be called once after delay seconds passed, in the propagation context
1366 1367 1368 1369 1370 1371 1372 |
# File 'lib/roby/execution_engine.rb', line 1366 def delayed(delay, description: 'delayed block', **, &block) handler = PollBlockDefinition.new(description, block, once: true, **) once do process_every << [handler, cycle_start, delay] end handler.id end |
#display_exceptions=(flag) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Controls whether this engine should indiscriminately display all fatal exceptions
This is on by default
2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 |
# File 'lib/roby/execution_engine.rb', line 2549 def display_exceptions=(flag) if flag @exception_display_handler ||= on_exception do |kind, error, tasks| level = if kind == EXCEPTION_HANDLED then :debug else :warn end send(level) do send(level, "encountered a #{kind} exception") Roby.log_exception_with_backtrace(error.exception, self, level) if kind == EXCEPTION_HANDLED send(level, "the exception was handled by") else send(level, "the exception involved") end tasks.each do |t| send(level, " #{t}") end break end end else remove_exception_listener(@exception_display_handler) @exception_display_handler = nil end end |
#display_exceptions? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
whether this engine should indiscriminately display all fatal exceptions
2578 2579 2580 |
# File 'lib/roby/execution_engine.rb', line 2578 def display_exceptions? !!@exception_display_handler end |
#error_handling_phase(events_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute errors in plan and handle the results
841 842 843 844 845 846 847 848 849 850 851 852 853 |
# File 'lib/roby/execution_engine.rb', line 841 def error_handling_phase(events_errors) # Do the exception handling phase errors = compute_errors(events_errors) notify_about_error_handling_results(errors) # nonfatal errors are only notified. Fatal errors (kill_tasks) are # handled in the propagation loop during garbage collection. Only # the free events errors have to be handled here. errors.free_events_errors.each do |exception, generators| generators.each { |g| g.unreachable!(exception.exception) } end return errors end |
#event_loop ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The main event loop. It returns when the execution engine is asked to quit. In general, this does not need to be called direclty: use #run to start the event loop in a separate thread.
2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 |
# File 'lib/roby/execution_engine.rb', line 2227 def event_loop last_stop_count = 0 last_quit_warning = Time.now @cycle_start = Time.now @cycle_index = 0 force_exit_deadline = nil last_process_times = Process.times last_dump_time = plan.event_logger.dump_time loop do begin if profile_gc? GC::Profiler.enable end if quitting? if forced_exit? return end begin remaining = clear return if !remaining if (last_stop_count != remaining.size) || (Time.now - last_quit_warning) > 10 if last_stop_count == 0 info "Roby quitting ..." end issue_quit_progression_warning(remaining) last_quit_warning = Time.now last_stop_count = remaining.size end rescue Exception => e warn "Execution thread failed to clean up" Roby.log_exception_with_backtrace(e, self, :warn, filter: false) return end end log_timepoint_group_start "cycle" while Time.now > cycle_start + cycle_length @cycle_start += cycle_length @cycle_index += 1 end stats = Hash.new stats[:start] = [cycle_start.tv_sec, cycle_start.tv_usec] stats[:actual_start] = Time.now - cycle_start stats[:cycle_index] = cycle_index log_timepoint_group 'process_events' do process_events end remaining_cycle_time = cycle_length - (Time.now - cycle_start) if use_oob_gc? stats[:pre_oob_gc] = GC.stat GC::OOB.run end # Sleep if there is enough time for it if remaining_cycle_time > SLEEP_MIN_TIME sleep(remaining_cycle_time) end log_timepoint 'sleep' cycle_end(stats) # Log cycle statistics process_times = Process.times dump_time = plan.event_logger.dump_time stats[:log_queue_size] = plan.log_queue_size stats[:plan_task_count] = plan.num_tasks stats[:plan_event_count] = plan.num_free_events stats[:gc] = GC.stat stats[:utime] = process_times.utime - last_process_times.utime stats[:stime] = process_times.stime - last_process_times.stime stats[:dump_time] = dump_time - last_dump_time stats[:state] = Roby::State stats[:end] = Time.now - cycle_start if profile_gc? stats[:gc_profile_data] = GC::Profiler.raw_data stats[:gc_total_time] = GC::Profiler.total_time else stats[:gc_profile_data] = nil stats[:gc_total_time] = 0 end log_flush_cycle :cycle_end, stats last_dump_time = dump_time last_process_times = process_times stats = Hash.new @cycle_start += cycle_length @cycle_index += 1 if profile_gc? GC::Profiler.disable end rescue Exception => e if e.kind_of?(Interrupt) if quitting? if force_exit_deadline && (force_exit_deadline - Time.now) < 0 fatal "Quitting without cleaning up" force_quit else fatal "Still #{Integer(force_exit_deadline - Time.now)}s before interruption will quit without cleaning up" end else fatal "Received interruption request" fatal "Interrupt again in #{INTERRUPT_FORCE_EXIT_DEAD_ZONE}s to quit without cleaning up" quit force_exit_deadline = Time.now + INTERRUPT_FORCE_EXIT_DEAD_ZONE end elsif !quitting? quit fatal "Execution thread quitting because of unhandled exception" Roby.log_exception_with_backtrace(e, self, :fatal) else fatal "Execution thread FORCEFULLY quitting because of unhandled exception" Roby.log_exception_with_backtrace(e, self, :fatal) raise end ensure log_timepoint_group_end "cycle" end end ensure if !plan.tasks.empty? warn "the following tasks are still present in the plan:" plan.tasks.each do |t| warn " #{t}" end end end |
#event_propagation_phase(initial_events, propagation_info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block
If a block is given, it is called with the initial set of events: the events we should consider as already emitted in the following propagation. seeds si a list of procs which should be called to initiate the propagation (i.e. build an initial set of events)
826 827 828 829 830 831 832 833 834 835 836 837 838 |
# File 'lib/roby/execution_engine.rb', line 826 def event_propagation_phase(initial_events, propagation_info) @propagation_id += 1 gather_errors do next_steps = initial_events while !next_steps.empty? while !next_steps.empty? next_steps = event_propagation_step(next_steps, propagation_info) end next_steps = gather_propagation { call_propagation_handlers } end end end |
#event_propagation_step(current_step, propagation_info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagate one step
current_step describes all pending emissions and calls.
This method calls ExecutionEngine.next_event to get the description of the next event to call. If there are signals going to this event, they are processed and the forwardings will be treated in the next step.
The method returns the next set of pending emissions and calls, adding the forwardings and signals that the propagation of the considered event have added.
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 |
# File 'lib/roby/execution_engine.rb', line 1053 def event_propagation_step(current_step, propagation_info) signalled, step_id, forward_info, call_info = next_event(current_step) next_step = nil if !call_info.empty? source_events, source_generators, context = prepare_propagation(signalled, false, call_info) if source_events log(:generator_propagate_events, false, source_events, signalled) if signalled.self_owned? next_step = gather_propagation(current_step) do propagation_context(source_events | source_generators) do begin propagation_info.add_generator_call(signalled) signalled.call_without_propagation(context) rescue Roby::LocalizedError => e if signalled.command_emitted? add_error(e) else signalled.emit_failed(e) end rescue Exception => e if signalled.command_emitted? add_error(Roby::CommandFailed.new(e, signalled)) else signalled.emit_failed(Roby::CommandFailed.new(e, signalled)) end end end end end end if forward_info next_step ||= Hash.new target_info = (next_step[signalled] ||= [@propagation_step_id += 1, [], []]) target_info[PENDING_PROPAGATION_FORWARD].concat(forward_info) end elsif !forward_info.empty? source_events, source_generators, context = prepare_propagation(signalled, true, forward_info) if source_events log(:generator_propagate_events, true, source_events, signalled) # If the destination event is not owned, but if the peer is not # connected, the event is our responsibility now. if signalled.self_owned? || !signalled.owners.any? { |peer| peer != plan.local_owner && peer.connected? } next_step = gather_propagation(current_step) do propagation_context(source_events | source_generators) do begin if event = signalled.emit_without_propagation(context) propagation_info.add_event_emission(event) emitted_events << event end rescue Roby::LocalizedError => e Roby.warn "Internal Error: #emit_without_propagation emitted a LocalizedError exception. This is unsupported and will become a fatal error in the future. You should usually replace raise with engine.add_error" Roby.display_exception(Roby.logger.io(:warn), e, false) add_error(e) rescue Exception => e Roby.warn "Internal Error: #emit_without_propagation emitted an exception. This is unsupported and will become a fatal error in the future. You should create a proper localized error and replace raise with engine.add_error" Roby.display_exception(Roby.logger.io(:warn), e, false) add_error(Roby::EmissionFailed.new(e, signalled)) end end end end end end current_step.merge!(next_step) if next_step current_step end |
#every(duration, description: 'periodic handler', **options, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call block every duration seconds. Note that duration is round up to the cycle size (time between calls is *at least* duration)
The returned value is the periodic handler ID. It can be passed to #remove_periodic_handler to undefine it.
2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 |
# File 'lib/roby/execution_engine.rb', line 2059 def every(duration, description: 'periodic handler', **, &block) handler = PollBlockDefinition.new(description, block, **) once do if handler.call(self, plan) process_every << [handler, cycle_start, duration] end end handler.id end |
#execute(catch: [], type: :external_events) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context. If the block raises, the exception is raised back in the calling thread.
2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 |
# File 'lib/roby/execution_engine.rb', line 2405 def execute(catch: [], type: :external_events) if inside_control? return yield end capture_catch = lambda do |symbol, *other| caught = catch(symbol) do if other.empty? return [:ret, yield] else return capture_catch(block, *other) end end [:throw, [symbol, caught]] end ivar = Concurrent::IVar.new once(sync: ivar, type: type) do begin if !catch.empty? result = capture_catch.call(*catch) { yield } ivar.set(result) else ivar.set([:ret, yield]) end rescue ::Exception => e # rubocop:disable Lint/RescueException ivar.set([:raise, e]) end end mode, value = ivar.value! case mode when :ret return value when :throw throw *value else raise value end end |
#execute_delayed_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds the events in delayed_events whose time has passed into the propagation. This must be called in propagation context.
See #add_event_delay and #delayed_events
503 504 505 506 507 508 509 510 511 |
# File 'lib/roby/execution_engine.rb', line 503 def execute_delayed_events reftime = Time.now delayed_events.delete_if do |time, forward, source, signalled, context| if time <= reftime add_event_propagation(forward, [source], signalled, context, nil) true end end end |
#finalized_event(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by #plan when an event has been finalized
524 525 526 527 528 529 530 |
# File 'lib/roby/execution_engine.rb', line 524 def finalized_event(event) if @propagation @propagation.delete(event) end event.unreachable!("finalized", plan) # since the event is already finalized, end |
#finalized_task(task) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by #plan when a task has been finalized
519 520 521 |
# File 'lib/roby/execution_engine.rb', line 519 def finalized_task(task) @pending_exceptions.delete(task) end |
#force_quit ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Force quitting, without cleaning up
2388 |
# File 'lib/roby/execution_engine.rb', line 2388 def force_quit; @quit = 2 end |
#forced_exit? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the control thread is currently quitting
2384 |
# File 'lib/roby/execution_engine.rb', line 2384 def forced_exit?; @quit > 1 end |
#garbage_collect(force_on = nil) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Kills and removes all unneeded tasks. force_on is a set of task whose garbage-collection must be performed, even though those tasks are actually useful for the system. This is used to properly kill tasks for which errors have been detected.
1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 |
# File 'lib/roby/execution_engine.rb', line 1861 def garbage_collect(force_on = nil) if force_on && !force_on.empty? info "GC: adding #{force_on.size} tasks in the force_gc set" mismatching_plan = force_on.find_all do |t| if t.plan == self.plan plan.force_gc << t false else true end end if !mismatching_plan.empty? raise ArgumentError, "#{mismatching_plan.map { |t| "#{t}(plan=#{t.plan})" }.join(", ")} have been given to #{self}.garbage_collect, but they are not tasks in #{plan}" end end unmark_finished_missions_and_permanent_tasks # The set of tasks for which we queued stop! at this cycle # #finishing? is false until the next event propagation cycle finishing = Set.new did_something = true while did_something did_something = false tasks = plan.unneeded_tasks | plan.force_gc local_tasks = plan.local_tasks & tasks remote_tasks = tasks - local_tasks # Remote tasks are simply removed, regardless of other concerns for t in remote_tasks debug { "GC: removing the remote task #{t}" } plan.garbage_task(t) end break if local_tasks.empty? debug do debug "#{local_tasks.size} tasks are unneeded in this plan" local_tasks.each do |t| debug " #{t} mission=#{plan.mission_task?(t)} permanent=#{plan.permanent_task?(t)}" end break end if local_tasks.all? { |t| t.pending? || t.finished? } local_tasks.each do |t| debug { "GC: #{t} is not running, removed" } if plan.garbage_task(t) did_something = true end end break end # Mark all root local_tasks as garbage. roots = local_tasks.dup plan.each_task_relation_graph do |g| next if !g.root_relation? || g.weak? roots.delete_if do |t| g.each_in_neighbour(t).any? { |p| !p.finished? } end break if roots.empty? end (roots.to_set - finishing).each do |local_task| if local_task.pending? info "GC: removing pending task #{local_task}" if plan.garbage_task(local_task) did_something = true end elsif local_task.failed_to_start? info "GC: removing task that failed to start #{local_task}" if plan.garbage_task(local_task) did_something = true end elsif local_task.starting? # wait for task to be started before killing it debug { "GC: #{local_task} is starting" } elsif local_task.finished? debug { "GC: #{local_task} is not running, removed" } if plan.garbage_task(local_task) did_something = true end elsif !local_task.finishing? if local_task.quarantined? warn "GC: #{local_task} is running but in quarantine" elsif local_task.event(:stop).controlable? debug { "GC: attempting to stop #{local_task}" } if !local_task.respond_to?(:stop!) warn "something fishy: #{local_task}/stop is controlable but there is no #stop! method, putting in quarantine" plan.quarantine_task(local_task) else finishing << local_task end else warn "GC: #{local_task} cannot be stopped, putting in quarantine" plan.quarantine_task(local_task) end elsif local_task.finishing? debug do debug "GC: waiting for #{local_task} to finish" local_task.history.each do |ev| debug "GC: #{ev}" end break end else warn "GC: ignored #{local_task}" end end end finishing.each do |task| task.stop! end plan.unneeded_events.each do |event| plan.garbage_event(event) end !finishing.empty? end |
#gather_errors ⇒ Array<ExecutionException>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Executes the given block while gathering errors, and returns the errors that have been declared with #add_error
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 |
# File 'lib/roby/execution_engine.rb', line 801 def gather_errors if @propagation_exceptions raise InternalError, "recursive call to #gather_errors" end # The ensure clause must NOT apply to the recursive check above. # Otherwise, we end up resetting @propagation_exceptions to nil, # which wreaks havoc begin @propagation_exceptions = [] yield @propagation_exceptions ensure @propagation_exceptions = nil end end |
#gather_external_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Gather the events that come out of this plan manager
769 770 771 772 773 774 |
# File 'lib/roby/execution_engine.rb', line 769 def gather_external_events process_once_blocks gather_framework_errors('delayed events') { execute_delayed_events } call_poll_blocks(self.class.external_events_handlers) call_poll_blocks(self.external_events_handlers) end |
#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Yields to the block and registers any raised exception using #add_framework_error
If the method is called within an exception-gathering context (either #process_events or #gather_framework_errors itself), nothing else is done. Otherwise, #process_pending_application_exceptions is called to re-raise any caught exception
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 |
# File 'lib/roby/execution_engine.rb', line 599 def gather_framework_errors(source, raise_caught_exceptions: true) if @application_exceptions recursive_error_gathering_context = true else @application_exceptions = [] end yield if !recursive_error_gathering_context && !raise_caught_exceptions clear_application_exceptions end rescue Exception => e add_framework_error(e, source) if !recursive_error_gathering_context && !raise_caught_exceptions clear_application_exceptions end ensure if !recursive_error_gathering_context && raise_caught_exceptions process_pending_application_exceptions end end |
#gather_propagation(initial_set = Hash.new) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets up a propagation context, yielding the block in it. During this propagation stage, all calls to #emit and #call are stored in an internal hash of the form:
target => [forward_sources, signal_sources]
where the two _sources are arrays of the form
[[source, context], ...]
The method returns the resulting hash. Use #in_propagation_context? to know if the current engine is in a propagation context, and #add_event_propagation to add a new entry to this set.
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 |
# File 'lib/roby/execution_engine.rb', line 548 def gather_propagation(initial_set = Hash.new) raise InternalError, "nested call to #gather_propagation" if in_propagation_context? old_allow_propagation, @allow_propagation = @allow_propagation, true # The ensure clause must NOT apply to the recursive check above. # Otherwise, we end up resetting @propagation_exceptions to nil, # which wreaks havoc begin @propagation = initial_set @propagation_sources = nil @propagation_step_id = 0 before = @propagation propagation_context([]) do yield end result, @propagation = @propagation, nil return result ensure @propagation = nil @allow_propagation = old_allow_propagation end end |
#gathering? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
440 441 442 443 |
# File 'lib/roby/execution_engine.rb', line 440 def gathering? Roby.warn_deprecated "#gathering? is deprecated, use #in_propagation_context? instead" in_propagation_context? end |
#gathering_errors? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
793 794 795 |
# File 'lib/roby/execution_engine.rb', line 793 def gathering_errors? !!@propagation_exceptions end |
#has_pending_exception_matching?(e, object) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object
1821 1822 1823 |
# File 'lib/roby/execution_engine.rb', line 1821 def has_pending_exception_matching?(e, object) @pending_exceptions[object] && @pending_exceptions[object].include?([e.exception.class, e.origin]) end |
#has_pending_forward?(from, to, expected_context) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether a forward matching this signature is currently pending
720 721 722 723 724 725 726 |
# File 'lib/roby/execution_engine.rb', line 720 def has_pending_forward?(from, to, expected_context) if pending = @propagation[to] pending[PENDING_PROPAGATION_FORWARD].each_slice(3).any? do |event, context, timespec| (from === event.generator) && (expected_context === context) end end end |
#has_pending_signal?(from, to, expected_context) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether a signal matching this signature is currently pending
729 730 731 732 733 734 735 |
# File 'lib/roby/execution_engine.rb', line 729 def has_pending_signal?(from, to, expected_context) if pending = @propagation[to] pending[PENDING_PROPAGATION_SIGNAL].each_slice(3).any? do |event, context, timespec| (from === event.generator) && (expected_context === context) end end end |
#has_propagation_for?(target) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
674 675 676 |
# File 'lib/roby/execution_engine.rb', line 674 def has_propagation_for?(target) @propagation && @propagation.has_key?(target) end |
#has_queued_events? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if some events are queued
533 534 535 |
# File 'lib/roby/execution_engine.rb', line 533 def has_queued_events? !@propagation.empty? end |
#has_waiting_work? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether this EE has asynchronous waiting work waiting to be processed
1435 1436 1437 1438 1439 |
# File 'lib/roby/execution_engine.rb', line 1435 def has_waiting_work? # Filter out unscheduled promises (promises on which #execute was # not called). If they are unscheduled, we're not waiting on them waiting_work.any? { |w| !w.unscheduled? } end |
#in_propagation_context? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if we are within a propagation context (i.e. within event processing)
447 448 449 |
# File 'lib/roby/execution_engine.rb', line 447 def in_propagation_context? !!@propagation end |
#inhibited_exception?(exception) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Query whether the given exception is inhibited in this plan
1346 1347 1348 1349 |
# File 'lib/roby/execution_engine.rb', line 1346 def inhibited_exception?(exception) unhandled, _ = remove_inhibited_exceptions([exception.to_execution_exception]) unhandled.empty? end |
#inside_control? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the current thread is the execution thread of this engine
See #outside_control? for a discussion of the use of #inside_control? and #outside_control? when testing the threading context
2096 2097 2098 2099 |
# File 'lib/roby/execution_engine.rb', line 2096 def inside_control? t = thread !t || t == Thread.current end |
#issue_quit_progression_warning(remaining) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 |
# File 'lib/roby/execution_engine.rb', line 2209 def issue_quit_progression_warning(remaining) info "Waiting for #{remaining.size} tasks to finish (#{plan.num_tasks} tasks still in plan) and #{waiting_work.size} async work jobs" remaining.each do |task| info " #{task}" end quarantined = remaining.find_all { |t| t.quarantined? } if quarantined.size != 0 info "#{quarantined.size} tasks in quarantine" end end |
#join_all_waiting_work(timeout: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Waits for all obligations in #waiting_work to finish
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/roby/execution_engine.rb', line 389 def join_all_waiting_work(timeout: nil) return [], PropagationInfo.new if waiting_work.empty? deadline = if timeout Time.now + timeout end finished = Array.new propagation_info = PropagationInfo.new begin framework_errors = gather_framework_errors("#join_all_waiting_work", raise_caught_exceptions: false) do next_steps = nil event_errors = gather_errors do next_steps = gather_propagation do finished.concat(process_waiting_work) blocks = Array.new while !once_blocks.empty? blocks << once_blocks.pop.last end call_poll_blocks(blocks) end end this_propagation = propagate_events_and_errors(next_steps, event_errors, garbage_collect_pass: false) propagation_info.merge(this_propagation) end propagation_info.add_framework_errors(framework_errors) Thread.pass has_scheduled_promises = has_waiting_work? if deadline && (Time.now > deadline) && has_scheduled_promises raise JoinAllWaitingWorkTimeout.new(waiting_work) end end while has_waiting_work? return finished, propagation_info end |
#killall ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Kill all tasks that are currently running in the plan
2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 |
# File 'lib/roby/execution_engine.rb', line 2488 def killall scheduler_enabled = scheduler.enabled? plan.permanent_tasks.clear plan.permanent_events.clear plan.mission_tasks.clear scheduler.enabled = false quit start_new_cycle process_events cycle_end(Hash.new) plan.transactions.each do |trsc| trsc.discard_transaction! end start_new_cycle Thread.pass process_events cycle_end(Hash.new) ensure scheduler.enabled = scheduler_enabled end |
#next_event(pending) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
call-seq:
next_event(pending) => event, propagation_info
Determines the event in current_step which should be signalled now. Removes it from the set and returns the event and the associated propagation information.
See #gather_propagation for the format of the returned # propagation_info
954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 |
# File 'lib/roby/execution_engine.rb', line 954 def next_event(pending) # this variable is 2 if selected_event is being forwarded, 1 if it # is both forwarded and signalled and 0 if it is only signalled priority, step_id, selected_event = nil for propagation_step in pending target_event = propagation_step[0] target_step_id, forwards, signals = *propagation_step[1] target_priority = if forwards.empty? && signals.empty? then 2 elsif forwards.empty? then 0 else 1 end do_select = if selected_event if precedence_graph.reachable?(selected_event, target_event) false elsif precedence_graph.reachable?(target_event, selected_event) true elsif priority < target_priority true elsif priority == target_priority # If they are of the same priority, handle # earlier events first step_id > target_step_id else false end else true end if do_select selected_event = target_event priority = target_priority step_id = target_step_id end end [selected_event, *pending.delete(selected_event)] end |
#notify_about_error_handling_results(errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions
867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 |
# File 'lib/roby/execution_engine.rb', line 867 def notify_about_error_handling_results(errors) kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors = errors.kill_tasks, errors.fatal_errors, errors.nonfatal_errors, errors.free_events_errors, errors.handled_errors if !nonfatal_errors.empty? if display_exceptions? warn "#{nonfatal_errors.size} unhandled non-fatal exceptions" end nonfatal_errors.each do |exception, tasks| notify_exception(EXCEPTION_NONFATAL, exception, tasks) end end if !handled_errors.empty? if display_exceptions? warn "#{handled_errors.size} handled errors" end handled_errors.each do |exception, tasks| notify_exception(EXCEPTION_HANDLED, exception, tasks) end end if !free_events_errors.empty? if display_exceptions? warn "#{free_events_errors.size} free event exceptions" end free_events_errors.each do |exception, events| notify_exception(EXCEPTION_FREE_EVENT, exception, events) end end if !fatal_errors.empty? if display_exceptions? warn "#{fatal_errors.size} unhandled fatal exceptions, involving #{kill_tasks.size} tasks that will be forcefully killed" end fatal_errors.each do |exception, tasks| notify_exception(EXCEPTION_FATAL, exception, tasks) end if display_exceptions? kill_tasks.each do |task| log_pp :warn, task end end end end |
#notify_exception(kind, error, involved_objects) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call to notify the listeners registered with #on_exception of the occurence of an exception
2592 2593 2594 2595 2596 2597 |
# File 'lib/roby/execution_engine.rb', line 2592 def notify_exception(kind, error, involved_objects) log(:exception_notification, plan.droby_id, kind, error, involved_objects) exception_listeners.each do |listener| listener.call(self, kind, error, involved_objects) end end |
#on_exception(description: 'exception listener', on_error: :disable) {|kind, error, tasks| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers a callback that will be called when exceptions are propagated in the plan
2539 2540 2541 2542 2543 |
# File 'lib/roby/execution_engine.rb', line 2539 def on_exception(description: 'exception listener', on_error: :disable, &block) handler = PollBlockDefinition.new(description, block, on_error: on_error) exception_listeners << handler handler end |
#once(sync: nil, description: 'once block', type: :external_events, **options, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules block to be called at the beginning of the next execution cycle, in propagation context.
1359 1360 1361 1362 |
# File 'lib/roby/execution_engine.rb', line 1359 def once(sync: nil, description: 'once block', type: :external_events, **, &block) waiting_work << sync if sync once_blocks << create_propagation_handler(description: description, type: type, once: true, **, &block) end |
#outside_control? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the current thread is not the execution thread of this engine, or if there is not control thread. When you check the current thread context, always use a negated form. Do not do
if Roby.inside_control?
ERROR
end
Do instead
if !Roby.outside_control?
ERROR
end
Since the first form will fail if there is no control thread, while the second form will work. Use the first form only if you require that there actually IS a control thread.
2118 2119 2120 2121 |
# File 'lib/roby/execution_engine.rb', line 2118 def outside_control? t = thread !t || t != Thread.current end |
#prepare_propagation(target, is_forward, info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
call-seq:
prepare_propagation(target, is_forward, info) => source_events, source_generators, context
prepare_propagation(target, is_forward, info) => nil
Parses the propagation information info in the context of a signalling if is_forward is true and a forwarding otherwise. target is the target event.
The method adds the appropriate delayed events using #add_event_delay, and returns either nil if no propagation is to be performed, or the propagation source events, generators and context.
The format of info is the same as the hash values described in #gather_propagation.
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 |
# File 'lib/roby/execution_engine.rb', line 1007 def prepare_propagation(target, is_forward, info) timeref = Time.now source_events, source_generators, context = Set.new, Set.new, [] delayed = true info.each_slice(3) do |src, ctxt, time| if time && (delay = ExecutionEngine.make_delay(timeref, src, target, time)) add_event_delay(delay, is_forward, src, target, ctxt) next end delayed = false # Merge identical signals. Needed because two different event handlers # can both call #emit, and two signals are set up if src if src.respond_to?(:generator) source_events << src source_generators << src.generator else source_generators << src end end if ctxt context.concat ctxt end end unless delayed [source_events, source_generators, context] end end |
#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The inside part of the event loop
It gathers initial events and errors and propagate them
1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 |
# File 'lib/roby/execution_engine.rb', line 1647 def process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) if @application_exceptions raise RecursivePropagationContext, "recursive call to process_events" end passed_recursive_check = true # to avoid having a almost-method-global ensure block @application_exceptions = [] @emitted_events = Array.new @thread_pool.send :synchronize do @thread_pool.send(:ns_prune_pool) end # Gather new events and propagate them events_errors = nil next_steps = gather_propagation do events_errors = gather_errors do if caller_block yield caller_block = nil end if !quitting? || !garbage_collect([]) process_waiting_work log_timepoint 'workers' gather_external_events log_timepoint 'external_events' call_propagation_handlers log_timepoint 'propagation_handlers' end end end propagation_info = propagate_events_and_errors(next_steps, events_errors, garbage_collect_pass: garbage_collect_pass) if Roby.app.abort_on_exception? && !all_errors.fatal_errors.empty? raise Aborting.new(propagation_info.each_fatal_error.map(&:exception)) end propagation_info.framework_errors.concat(@application_exceptions) propagation_info ensure if passed_recursive_check process_pending_application_exceptions(raise_framework_errors: raise_framework_errors) end end |
#process_events_synchronous(seeds = Hash.new, initial_errors = Array.new, enable_scheduler: false, raise_errors: true) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling. This mode is implemented using this method
When errors occur in this mode, the exceptions are raised directly. This is useful in tests as, this way, we are sure that the exception will not get overlooked
If multiple errors are raised in a single call (this is possible due to Roby’s error handling mechanisms), the method will raise SynchronousEventProcessingMultipleErrors to wrap all the exceptions into one.
1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 |
# File 'lib/roby/execution_engine.rb', line 1704 def process_events_synchronous(seeds = Hash.new, initial_errors = Array.new, enable_scheduler: false, raise_errors: true) Roby.warn_deprecated "#process_events_synchronous is deprecated, use the expect_execution harness instead" if @application_exceptions raise RecursivePropagationContext, "recursive call to process_events" end passed_recursive_check = true # to avoid having a almost-method-global ensure block @application_exceptions = [] # Save early for the benefit of the 'ensure' block current_scheduler_enabled = scheduler.enabled? if (!seeds.empty? || !initial_errors.empty?) && block_given? raise ArgumentError, "cannot provide both seeds/inital errors and a block" elsif block_given? seeds = gather_propagation do initial_errors = gather_errors do yield end end end scheduler.enabled = enable_scheduler propagation_info = propagate_events_and_errors(seeds, initial_errors, garbage_collect_pass: false) if !propagation_info.kill_tasks.empty? gc_initial_errors = nil gc_seeds = gather_propagation do gc_initial_errors = gather_errors do garbage_collect(propagation_info.kill_tasks) end end gc_errors = propagate_events_and_errors(gc_seeds, gc_initial_errors, garbage_collect_pass: false) propagation_info.merge(gc_errors) end if raise_errors propagation_info = propagation_info.exceptions if propagation_info.size == 1 raise propagation_info.first elsif !propagation_info.empty? raise SynchronousEventProcessingMultipleErrors.new(propagation_info.map(&:exception)) end else propagation_info end rescue SynchronousEventProcessingMultipleErrors => e raise SynchronousEventProcessingMultipleErrors.new(e.errors + clear_application_exceptions) rescue Exception => e if passed_recursive_check application_exceptions = clear_application_exceptions if !application_exceptions.empty? raise SynchronousEventProcessingMultipleErrors.new(application_exceptions.map(&:first) + [e]) else raise e end else raise e end ensure if passed_recursive_check && @application_exceptions process_pending_application_exceptions end scheduler.enabled = current_scheduler_enabled end |
#process_once_blocks ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Dispatch #once_blocks to the other handler sets for further processing
757 758 759 760 761 762 763 764 765 766 |
# File 'lib/roby/execution_engine.rb', line 757 def process_once_blocks while !once_blocks.empty? type, block = once_blocks.pop if type == :external_events external_events_handlers << block else propagation_handlers << block end end end |
#process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 |
# File 'lib/roby/execution_engine.rb', line 622 def process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) # We don't aggregate exceptions, so report them all and raise one if display_exceptions? application_errors.each do |error, source| if !error.kind_of?(Interrupt) fatal "Application error in #{source}" Roby.log_exception_with_backtrace(error, self, :fatal) end end end error, source = application_errors.find do |error, _| raise_framework_errors || error.kind_of?(SignalException) end if error raise error, "in #{source}: #{error.message}", error.backtrace end end |
#process_waiting_work ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler)
1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 |
# File 'lib/roby/execution_engine.rb', line 1444 def process_waiting_work finished, not_finished = waiting_work.partition do |work| work.complete? end finished.find_all do |work| work.rejected? && (work.respond_to?(:has_error_handler?) && !work.has_error_handler?) end.each do |work| e = work.reason e.set_backtrace(e.backtrace + caller) add_framework_error(e, work.to_s) end @waiting_work = not_finished finished end |
#promise(description: nil, executor: thread_pool, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a promise to execute the given block in a separate thread
Note that the returned value is a Promise. This means that callbacks added with #on_success or #rescue will be executed in the execution engine thread by default.
2604 2605 2606 |
# File 'lib/roby/execution_engine.rb', line 2604 def promise(description: nil, executor: thread_pool, &block) Promise.new(self, executor: executor, description: description, &block) end |
#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagate an initial set of event propagations and errors
1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 |
# File 'lib/roby/execution_engine.rb', line 1784 def propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) propagation_info = PropagationInfo.new events_errors = initial_errors.dup begin log_timepoint_group 'event_propagation_phase' do events_errors.concat(event_propagation_phase(next_steps, propagation_info)) end next_steps = gather_propagation do exception_propagation_errors, error_phase_results = nil log_timepoint_group 'error_handling_phase' do exception_propagation_errors = gather_errors do error_phase_results = error_handling_phase(events_errors) end end add_exceptions_for_inhibition(error_phase_results.each_fatal_error) propagation_info.merge(error_phase_results) garbage_collection_errors = gather_errors do plan.generate_induced_errors(error_phase_results) if garbage_collect_pass garbage_collect(error_phase_results.kill_tasks) else [] end end events_errors = (exception_propagation_errors + garbage_collection_errors) log_timepoint 'garbage_collect' end end while !next_steps.empty? || !events_errors.empty? propagation_info end |
#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The core exception propagation algorithm
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 |
# File 'lib/roby/execution_engine.rb', line 1181 def propagate_exception_in_plan(exceptions) propagation_graph = dependency_graph.reverse # Propagate the exceptions in the hierarchy handled_unhandled = Array.new exceptions.each do |exception, parents| origin = exception.origin if parents filtered_parents = parents.find_all { |t| t.depends_on?(origin) } if filtered_parents != parents warn "some parents specified for #{exception.exception}(#{exception.exception.class}) are actually not parents of #{origin}, they got filtered out" (parents - filtered_parents).each do |task| warn " #{task}" end if filtered_parents.empty? parents = propagation_graph.out_neighbours(origin) else parents = filtered_parents end end else parents = propagation_graph.out_neighbours(origin) end debug do debug "propagating exception " log_pp :debug, exception if !parents.empty? debug " constrained to parents" log_nest(2) do parents.each do |p| log_pp :debug, p end end end break end visitor = ExceptionPropagationVisitor.new(propagation_graph, exception, origin, parents) do |e, task| yield(e, task) end visitor.visit unhandled = visitor.unhandled_exceptions.inject { |a, b| a.merge(b) } handled = visitor.handled_exceptions.inject { |a, b| a.merge(b) } handled_unhandled << [handled, unhandled] end exceptions_handled_by = Array.new unhandled_exceptions = Array.new handled_unhandled.each do |handled, e| if e if e.handled = yield(e, plan) if handled handled_by = (handled.propagation_leafs.to_set << plan) exceptions_handled_by << [handled.merge(e), handled_by] else handled = e exceptions_handled_by << [e, [plan].to_set] end else affected_tasks = e.trace.vertices.to_set if handled affected_tasks -= handled.trace.vertices exceptions_handled_by << [handled, handled.propagation_leafs.to_set] end unhandled_exceptions << [e, affected_tasks] end else exceptions_handled_by << [handled, handled.propagation_leafs.to_set] end end debug do debug "#{unhandled_exceptions.size} unhandled exceptions remain" log_nest(2) do unhandled_exceptions.each do |e, affected_tasks| log_pp :debug, e debug "Affects #{affected_tasks.size} tasks" log_nest(2) do affected_tasks.each do |t| log_pp :debug, t end end end end break end return unhandled_exceptions, exceptions_handled_by end |
#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions
1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 |
# File 'lib/roby/execution_engine.rb', line 1281 def propagate_exceptions(exceptions) if exceptions.empty? return Array.new, Array.new, Array.new end # Remove all exception that are not associated with a task exceptions, free_events_exceptions = exceptions.partition do |e, _| e.origin end # Normalize the free events exceptions free_events_exceptions = free_events_exceptions.map do |e, _| if e.exception.failed_generator.plan [e, Set[e.exception.failed_generator]] end end.compact debug "Filtering inhibited exceptions" exceptions = log_nest(2) do non_inhibited, _ = remove_inhibited_exceptions(exceptions) # Reset the trace for the real propagation non_inhibited.map do |e, _| _, propagate_through = exceptions.find { |original_e, _| original_e.exception == e.exception } e.reset_trace [e, propagate_through] end end debug "Propagating #{exceptions.size} non-inhibited exceptions" log_nest(2) do # Note that the first half of the method filtered the free # events exceptions out of 'exceptions' unhandled, handled = propagate_exception_in_plan(exceptions) do |e, object| object.handle_exception(e) end return unhandled, free_events_exceptions, handled end end |
#propagation_context(sources) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets the source_event and source_generator variables according to source. source is the from argument of #add_event_propagation
664 665 666 667 668 669 670 671 672 |
# File 'lib/roby/execution_engine.rb', line 664 def propagation_context(sources) current_sources = @propagation_sources raise InternalError, "not in a gathering context in #propagation_context" unless in_propagation_context? @propagation_sources = sources yield ensure @propagation_sources = current_sources end |
#propagation_source_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of events extracted from #sources
457 458 459 460 461 462 463 464 465 |
# File 'lib/roby/execution_engine.rb', line 457 def propagation_source_events result = Set.new for ev in @propagation_sources if ev.respond_to?(:generator) result << ev end end result end |
#propagation_source_generators ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of generators extracted from #sources
468 469 470 471 472 473 474 475 476 477 478 |
# File 'lib/roby/execution_engine.rb', line 468 def propagation_source_generators result = Set.new for ev in @propagation_sources result << if ev.respond_to?(:generator) ev.generator else ev end end result end |
#queue_forward(sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Queue a forwarding to be propagated
684 685 686 |
# File 'lib/roby/execution_engine.rb', line 684 def queue_forward(sources, target, context, timespec) add_event_propagation(true, sources, target, context, timespec) end |
#queue_signal(sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Queue a signal to be propagated
679 680 681 |
# File 'lib/roby/execution_engine.rb', line 679 def queue_signal(sources, target, context, timespec) add_event_propagation(false, sources, target, context, timespec) end |
#quit ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Make control quit properly
2386 |
# File 'lib/roby/execution_engine.rb', line 2386 def quit; @quit = 1 end |
#quitting? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the control thread is currently quitting
2382 |
# File 'lib/roby/execution_engine.rb', line 2382 def quitting?; @quit > 0 end |
#refresh_relations ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Refresh the value of cached relations
Some often-used relations are cached at #initialize, such as #dependency_graph and #precedence_graph. Call this when the actual graph objects have changed on the plan
134 135 136 137 138 139 |
# File 'lib/roby/execution_engine.rb', line 134 def refresh_relations @dependency_graph = plan.task_relation_graph_for(TaskStructure::Dependency) @precedence_graph = plan.event_relation_graph_for(EventStructure::Precedence) @signal_graph = plan.event_relation_graph_for(EventStructure::Signal) @forward_graph = plan.event_relation_graph_for(EventStructure::Forwarding) end |
#remove_at_cycle_end(handler_id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Removes a handler added by #at_cycle_end
2047 2048 2049 |
# File 'lib/roby/execution_engine.rb', line 2047 def remove_at_cycle_end(handler_id) at_cycle_end_handlers.delete_if { |h| h.object_id == handler_id } end |
#remove_exception_listener(handler) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Removes an exception listener registered with #on_exception
2586 2587 2588 |
# File 'lib/roby/execution_engine.rb', line 2586 def remove_exception_listener(handler) exception_listeners.delete(handler) end |
#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Process the given exceptions to remove the ones that are currently filtered by the plan repairs
The returned exceptions are propagated, i.e. their #trace method contains all the tasks that are affected by the absence of a handling mechanism
1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 |
# File 'lib/roby/execution_engine.rb', line 1331 def remove_inhibited_exceptions(exceptions) exceptions = exceptions.find_all do |execution_exception, _| execution_exception.origin.plan end propagate_exception_in_plan(exceptions) do |e, object| if has_pending_exception_matching?(e, object) true elsif object.respond_to?(:handles_error?) object.handles_error?(e) end end end |
#remove_periodic_handler(id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Removes a periodic handler defined by #every. id is the value returned by #every.
2072 2073 2074 2075 2076 |
# File 'lib/roby/execution_engine.rb', line 2072 def remove_periodic_handler(id) execute do process_every.delete_if { |spec| spec[0].id == id } end end |
#remove_propagation_handler(id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
362 363 364 365 366 |
# File 'lib/roby/execution_engine.rb', line 362 def remove_propagation_handler(id) disabled_handlers.delete_if { |p| p.id == id } super nil end |
#reset ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Make a quit EE ready for reuse
2391 2392 2393 |
# File 'lib/roby/execution_engine.rb', line 2391 def reset @quit = 0 end |
#reset_thread_pool ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2480 2481 2482 2483 2484 2485 |
# File 'lib/roby/execution_engine.rb', line 2480 def reset_thread_pool if @thread_pool @thread_pool.shutdown end @thread_pool = Concurrent::CachedThreadPool.new(idletime: 10) end |
#run(cycle: 0.1) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Main event loop. Valid options are
- cycle
-
the cycle duration in seconds (default: 0.1)
2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 |
# File 'lib/roby/execution_engine.rb', line 2127 def run(cycle: 0.1) if running? raise AlreadyRunning, "#run has already been called" end self.running = true @allow_propagation = false @waiting_work = Concurrent::Array.new @thread = Thread.current @thread.name = "MAIN" @cycle_length = cycle event_loop ensure self.running = false @thread = nil waiting_work.delete_if do |w| next(true) if w.complete? # rubocop:disable Lint/HandleExceptions begin w.fail ExecutionQuitError Roby.warn "forcefully terminated #{w} on quit" rescue Concurrent::MultipleAssignmentError # Race condition: something completed the promise while # we were trying to make it fail end # rubocop:enable Lint/HandleExceptions true end finalizers.each do |blk| begin blk.call rescue Exception => e Roby.warn "finalizer #{blk} failed" Roby.log_exception_with_backtrace(e, Roby, :warn) end end @quit = 0 @allow_propagation = true end |
#shutdown ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2475 2476 2477 2478 |
# File 'lib/roby/execution_engine.rb', line 2475 def shutdown killall thread_pool.shutdown end |
#start_new_cycle(time = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Set the cycle_start attribute and increment cycle_index
This is only used for testing purposes
2372 2373 2374 2375 |
# File 'lib/roby/execution_engine.rb', line 2372 def start_new_cycle(time = Time.now) @cycle_start = time @cycle_index += 1 end |
#unmark_finished_missions_and_permanent_tasks ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 |
# File 'lib/roby/execution_engine.rb', line 1836 def unmark_finished_missions_and_permanent_tasks to_unmark = plan.task_index.by_predicate[:finished?] | plan.task_index.by_predicate[:failed?] finished_missions = (plan.mission_tasks & to_unmark) # Remove all missions that are finished for finished_mission in finished_missions if !finished_mission.being_repaired? plan.unmark_mission_task(finished_mission) end end finished_permanent = (plan.permanent_tasks & to_unmark) for finished_permanent in (plan.permanent_tasks & to_unmark) if !finished_permanent.being_repaired? plan.unmark_permanent_task(finished_permanent) end end end |
#unreachable_event(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by EventGenerator when an event became unreachable
514 515 516 |
# File 'lib/roby/execution_engine.rb', line 514 def unreachable_event(event) delayed_events.delete_if { |_, _, _, signalled, _| signalled == event } end |
#wait_one_cycle ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Blocks until at least once execution cycle has been done
1991 1992 1993 1994 1995 1996 1997 |
# File 'lib/roby/execution_engine.rb', line 1991 def wait_one_cycle current_cycle = execute { cycle_index } while current_cycle == execute { cycle_index } raise ExecutionQuitError if !running? sleep(cycle_length) end end |
#wait_until(ev) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops the current thread until the given even is emitted. If the event becomes unreachable, an UnreachableEvent exception is raised.
2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 |
# File 'lib/roby/execution_engine.rb', line 2448 def wait_until(ev) if inside_control? raise ThreadMismatch, "cannot use #wait_until in execution threads" end ivar = Concurrent::IVar.new result = nil once(sync: ivar) do if ev.unreachable? ivar.fail(UnreachableEvent.new(ev, ev.unreachability_reason)) else ev.if_unreachable(cancel_at_emission: true) do |reason, event| ivar.fail(UnreachableEvent.new(event, reason)) if !ivar.complete? end ev.once do |ev| ivar.set(result) if !ivar.complete? end begin result = yield if block_given? rescue Exception => e ivar.fail(e) end end end ivar.value! end |