Class: Dynflow::World
- Inherits:
-
Object
- Object
- Dynflow::World
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck, Invalidation
- Defined in:
- lib/dynflow/world.rb,
lib/dynflow/world/invalidation.rb
Direct Known Subclasses
Defined Under Namespace
Modules: Invalidation, TriggerResult, Triggered
Instance Attribute Summary collapse
-
#action_classes ⇒ Object
readonly
Returns the value of attribute action_classes.
-
#auto_rescue ⇒ Object
readonly
Returns the value of attribute auto_rescue.
-
#auto_validity_check ⇒ Object
readonly
Returns the value of attribute auto_validity_check.
-
#client_dispatcher ⇒ Object
readonly
Returns the value of attribute client_dispatcher.
-
#clock ⇒ Object
readonly
Returns the value of attribute clock.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#connector ⇒ Object
readonly
Returns the value of attribute connector.
-
#coordinator ⇒ Object
readonly
Returns the value of attribute coordinator.
-
#dead_letter_handler ⇒ Object
readonly
Returns the value of attribute dead_letter_handler.
-
#delayed_executor ⇒ Object
readonly
Returns the value of attribute delayed_executor.
-
#execution_plan_cleaner ⇒ Object
readonly
Returns the value of attribute execution_plan_cleaner.
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
-
#executor_dispatcher ⇒ Object
readonly
Returns the value of attribute executor_dispatcher.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#logger_adapter ⇒ Object
readonly
Returns the value of attribute logger_adapter.
-
#meta ⇒ Object
readonly
Returns the value of attribute meta.
-
#middleware ⇒ Object
readonly
Returns the value of attribute middleware.
-
#persistence ⇒ Object
readonly
Returns the value of attribute persistence.
-
#subscription_index ⇒ Object
readonly
Returns the value of attribute subscription_index.
-
#terminated ⇒ Object
readonly
Returns the value of attribute terminated.
-
#termination_timeout ⇒ Object
readonly
Returns the value of attribute termination_timeout.
-
#throttle_limiter ⇒ Object
readonly
Returns the value of attribute throttle_limiter.
-
#transaction_adapter ⇒ Object
readonly
Returns the value of attribute transaction_adapter.
-
#validity_check_timeout ⇒ Object
readonly
Returns the value of attribute validity_check_timeout.
Instance Method Summary collapse
- #action_logger ⇒ Object
-
#auto_execute ⇒ Object
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating).
- #before_termination(&block) ⇒ Object
- #delay(action_class, delay_options, *args) ⇒ Object
- #delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object
- #event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
-
#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture
raises when ExecutionPlan is not accepted for execution.
- #get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
-
#initialize(config) ⇒ World
constructor
A new instance of World.
- #logger ⇒ Object
- #ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #plan(action_class, *args) ⇒ Object
- #plan_elsewhere(action_class, *args) ⇒ Object
- #plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
- #plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object
-
#post_initialization ⇒ Object
performs steps once the executor is ready and invalidation of previous worls is finished.
- #publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object
- #registered_world ⇒ Object
-
#reload! ⇒ Object
reload actions classes, intended only for devel.
- #subscribed_actions(action_class) ⇒ Object
- #terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
- #terminating? ⇒ Boolean
-
#trigger(action_class = nil, *args, &block) ⇒ TriggerResult
blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block.
- #try_spawn(what, lock_class = nil) ⇒ Object
- #update_register ⇒ Object
Methods included from Invalidation
#invalidate, #invalidate_execution_lock, #invalidate_planning_lock, #locks_validity_check, #perform_validity_checks, #with_valid_execution_plan_for_lock, #worlds_validity_check
Constructor Details
#initialize(config) ⇒ World
Returns a new instance of World.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/dynflow/world.rb', line 18 def initialize(config) @config = Config::ForWorld.new(config, self) # Set the telemetry instance as soon as possible Dynflow::Telemetry.set_adapter @config.telemetry_adapter Dynflow::Telemetry.register_metrics! @id = SecureRandom.uuid @logger_adapter = @config.logger_adapter @clock = spawn_and_wait(Clock, 'clock', logger) @config.validate @transaction_adapter = @config.transaction_adapter @persistence = Persistence.new(self, @config.persistence_adapter, :backup_deleted_plans => @config.backup_deleted_plans, :backup_dir => @config.backup_dir) @coordinator = Coordinator.new(@config.coordinator_adapter) if @config.executor @executor = Executors::Parallel.new(self, executor_class: @config.executor, heartbeat_interval: @config.executor_heartbeat_interval, queues_options: @config.queues) end @action_classes = @config.action_classes @auto_rescue = @config.auto_rescue @exit_on_terminate = Concurrent::AtomicBoolean.new(@config.exit_on_terminate) @connector = @config.connector @middleware = Middleware::World.new @middleware.use Middleware::Common::Transaction if @transaction_adapter @client_dispatcher = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age) @dead_letter_handler = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers) @auto_validity_check = @config.auto_validity_check @validity_check_timeout = @config.validity_check_timeout @throttle_limiter = @config.throttle_limiter @terminated = Concurrent::Promises.resolvable_event @termination_timeout = @config.termination_timeout calculate_subscription_index if executor @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore) executor.initialized.wait end update_register perform_validity_checks if auto_validity_check @termination_barrier = Mutex.new @before_termination_hooks = Queue.new if @config.auto_terminate at_exit do @exit_on_terminate.make_false # make sure we don't terminate twice self.terminate.wait end end post_initialization end |
Instance Attribute Details
#action_classes ⇒ Object (readonly)
Returns the value of attribute action_classes.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def action_classes @action_classes end |
#auto_rescue ⇒ Object (readonly)
Returns the value of attribute auto_rescue.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def auto_rescue @auto_rescue end |
#auto_validity_check ⇒ Object (readonly)
Returns the value of attribute auto_validity_check.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def auto_validity_check @auto_validity_check end |
#client_dispatcher ⇒ Object (readonly)
Returns the value of attribute client_dispatcher.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def client_dispatcher @client_dispatcher end |
#clock ⇒ Object (readonly)
Returns the value of attribute clock.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def clock @clock end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def config @config end |
#connector ⇒ Object (readonly)
Returns the value of attribute connector.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def connector @connector end |
#coordinator ⇒ Object (readonly)
Returns the value of attribute coordinator.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def coordinator @coordinator end |
#dead_letter_handler ⇒ Object (readonly)
Returns the value of attribute dead_letter_handler.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def dead_letter_handler @dead_letter_handler end |
#delayed_executor ⇒ Object (readonly)
Returns the value of attribute delayed_executor.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def delayed_executor @delayed_executor end |
#execution_plan_cleaner ⇒ Object (readonly)
Returns the value of attribute execution_plan_cleaner.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def execution_plan_cleaner @execution_plan_cleaner end |
#executor ⇒ Object (readonly)
Returns the value of attribute executor.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def executor @executor end |
#executor_dispatcher ⇒ Object (readonly)
Returns the value of attribute executor_dispatcher.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def executor_dispatcher @executor_dispatcher end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def id @id end |
#logger_adapter ⇒ Object (readonly)
Returns the value of attribute logger_adapter.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def logger_adapter @logger_adapter end |
#meta ⇒ Object (readonly)
Returns the value of attribute meta.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def @meta end |
#middleware ⇒ Object (readonly)
Returns the value of attribute middleware.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def middleware @middleware end |
#persistence ⇒ Object (readonly)
Returns the value of attribute persistence.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def persistence @persistence end |
#subscription_index ⇒ Object (readonly)
Returns the value of attribute subscription_index.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def subscription_index @subscription_index end |
#terminated ⇒ Object (readonly)
Returns the value of attribute terminated.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def terminated @terminated end |
#termination_timeout ⇒ Object (readonly)
Returns the value of attribute termination_timeout.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def termination_timeout @termination_timeout end |
#throttle_limiter ⇒ Object (readonly)
Returns the value of attribute throttle_limiter.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def throttle_limiter @throttle_limiter end |
#transaction_adapter ⇒ Object (readonly)
Returns the value of attribute transaction_adapter.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def transaction_adapter @transaction_adapter end |
#validity_check_timeout ⇒ Object (readonly)
Returns the value of attribute validity_check_timeout.
12 13 14 |
# File 'lib/dynflow/world.rb', line 12 def validity_check_timeout @validity_check_timeout end |
Instance Method Details
#action_logger ⇒ Object
115 116 117 |
# File 'lib/dynflow/world.rb', line 115 def action_logger logger_adapter.action_logger end |
#auto_execute ⇒ Object
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating)
279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/dynflow/world.rb', line 279 def auto_execute coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do planned_execution_plans = self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) } planned_execution_plans.map do |ep| if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty? execute(ep.id) end end.compact end rescue Coordinator::LockError => e logger.info "auto-executor lock already aquired: #{e.}" [] end |
#before_termination(&block) ⇒ Object
85 86 87 |
# File 'lib/dynflow/world.rb', line 85 def before_termination(&block) @before_termination_hooks << block end |
#delay(action_class, delay_options, *args) ⇒ Object
193 194 195 |
# File 'lib/dynflow/world.rb', line 193 def delay(action_class, , *args) (action_class: action_class, args: args, delay_options: ) end |
#delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object
197 198 199 200 201 202 |
# File 'lib/dynflow/world.rb', line 197 def (action_class:, args:, delay_options:, id: nil, caller_action: nil) raise 'No action_class given' if action_class.nil? execution_plan = ExecutionPlan.new(self, id) execution_plan.delay(caller_action, action_class, , *args) Scheduled[execution_plan.id] end |
#event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
231 232 233 |
# File 'lib/dynflow/world.rb', line 231 def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false) end |
#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture
raises when ExecutionPlan is not accepted for execution
227 228 229 |
# File 'lib/dynflow/world.rb', line 227 def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Execution[execution_plan_id], done, true) end |
#get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
251 252 253 |
# File 'lib/dynflow/world.rb', line 251 def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end |
#logger ⇒ Object
111 112 113 |
# File 'lib/dynflow/world.rb', line 111 def logger logger_adapter.dynflow_logger end |
#ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
243 244 245 |
# File 'lib/dynflow/world.rb', line 243 def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, true], done, false, timeout) end |
#ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
247 248 249 |
# File 'lib/dynflow/world.rb', line 247 def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, false], done, false, timeout) end |
#plan(action_class, *args) ⇒ Object
212 213 214 |
# File 'lib/dynflow/world.rb', line 212 def plan(action_class, *args) (action_class: action_class, args: args) end |
#plan_elsewhere(action_class, *args) ⇒ Object
204 205 206 207 208 209 210 |
# File 'lib/dynflow/world.rb', line 204 def plan_elsewhere(action_class, *args) execution_plan = ExecutionPlan.new(self, nil) execution_plan.delay(nil, action_class, {}, *args) plan_request(execution_plan.id) Scheduled[execution_plan.id] end |
#plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
235 236 237 |
# File 'lib/dynflow/world.rb', line 235 def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false) end |
#plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object
239 240 241 |
# File 'lib/dynflow/world.rb', line 239 def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Planning[execution_plan_id], done, false) end |
#plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object
216 217 218 219 220 221 222 223 |
# File 'lib/dynflow/world.rb', line 216 def (action_class:, args:, id: nil, caller_action: nil) ExecutionPlan.new(self, id).tap do |execution_plan| coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do execution_plan.prepare(action_class, caller_action: caller_action) execution_plan.plan(*args) end end end |
#post_initialization ⇒ Object
performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check if false, as it should be called after ‘perform_validity_checks` method)
77 78 79 80 81 82 83 |
# File 'lib/dynflow/world.rb', line 77 def post_initialization @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock) @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock) update_register @delayed_executor.start if auto_validity_check && @delayed_executor && !@delayed_executor.started? self.auto_execute if @config.auto_execute end |
#publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/dynflow/world.rb', line 255 def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason| done.reject reason if reason end client_dispatcher.ask([:publish_request, done, request, timeout], accepted) accepted.wait if wait_for_accepted done rescue => e accepted.reject e end |
#registered_world ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/dynflow/world.rb', line 103 def registered_world if executor Coordinator::ExecutorWorld.new(self) else Coordinator::ClientWorld.new(self) end end |
#reload! ⇒ Object
reload actions classes, intended only for devel
124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/dynflow/world.rb', line 124 def reload! # TODO what happens with newly loaded classes @action_classes = @action_classes.map do |klass| begin Utils.constantize(klass.to_s) rescue NameError nil # ignore missing classes end end.compact middleware.clear_cache! calculate_subscription_index end |
#subscribed_actions(action_class) ⇒ Object
119 120 121 |
# File 'lib/dynflow/world.rb', line 119 def subscribed_actions(action_class) @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : [] end |
#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
267 268 269 270 |
# File 'lib/dynflow/world.rb', line 267 def terminate(future = Concurrent::Promises.resolvable_future) start_termination.tangle(future) future end |
#terminating? ⇒ Boolean
272 273 274 |
# File 'lib/dynflow/world.rb', line 272 def terminating? defined?(@terminating) end |
#trigger(action_class = nil, *args, &block) ⇒ TriggerResult
blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/dynflow/world.rb', line 176 def trigger(action_class = nil, *args, &block) if action_class.nil? raise 'Neither action_class nor a block given' if block.nil? execution_plan = block.call(self) else execution_plan = plan(action_class, *args) end planned = execution_plan.state == :planned if planned done = execute(execution_plan.id, Concurrent::Promises.resolvable_future) Triggered[execution_plan.id, done] else PlaningFailed[execution_plan.id, execution_plan.errors.first.exception] end end |
#try_spawn(what, lock_class = nil) ⇒ Object
294 295 296 297 298 299 300 301 302 303 |
# File 'lib/dynflow/world.rb', line 294 def try_spawn(what, lock_class = nil) object = nil return nil if !executor || (object = @config.public_send(what)).nil? coordinator.acquire(lock_class.new(self)) if lock_class object.spawn.wait object rescue Coordinator::LockError nil end |
#update_register ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/dynflow/world.rb', line 89 def update_register @meta ||= @config. @meta['queues'] = @config.queues if @executor @meta['delayed_executor'] = true if @delayed_executor @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time if @already_registered coordinator.update_record(registered_world) else coordinator.register_world(registered_world) @already_registered = true end end |