Class: Dynflow::World
- Inherits:
-
Object
- Object
- Dynflow::World
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck, Invalidation
- Defined in:
- lib/dynflow/world.rb,
lib/dynflow/world/invalidation.rb
Direct Known Subclasses
Defined Under Namespace
Modules: Invalidation, TriggerResult, Triggered
Instance Attribute Summary collapse
-
#action_classes ⇒ Object
readonly
Returns the value of attribute action_classes.
-
#auto_rescue ⇒ Object
readonly
Returns the value of attribute auto_rescue.
-
#auto_validity_check ⇒ Object
readonly
Returns the value of attribute auto_validity_check.
-
#client_dispatcher ⇒ Object
readonly
Returns the value of attribute client_dispatcher.
-
#clock ⇒ Object
readonly
Returns the value of attribute clock.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#connector ⇒ Object
readonly
Returns the value of attribute connector.
-
#coordinator ⇒ Object
readonly
Returns the value of attribute coordinator.
-
#dead_letter_handler ⇒ Object
readonly
Returns the value of attribute dead_letter_handler.
-
#delayed_executor ⇒ Object
readonly
Returns the value of attribute delayed_executor.
-
#execution_plan_cleaner ⇒ Object
readonly
Returns the value of attribute execution_plan_cleaner.
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
-
#executor_dispatcher ⇒ Object
readonly
Returns the value of attribute executor_dispatcher.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#logger_adapter ⇒ Object
readonly
Returns the value of attribute logger_adapter.
-
#meta ⇒ Object
readonly
Returns the value of attribute meta.
-
#middleware ⇒ Object
readonly
Returns the value of attribute middleware.
-
#persistence ⇒ Object
readonly
Returns the value of attribute persistence.
-
#subscription_index ⇒ Object
readonly
Returns the value of attribute subscription_index.
-
#terminated ⇒ Object
readonly
Returns the value of attribute terminated.
-
#termination_timeout ⇒ Object
readonly
Returns the value of attribute termination_timeout.
-
#throttle_limiter ⇒ Object
readonly
Returns the value of attribute throttle_limiter.
-
#transaction_adapter ⇒ Object
readonly
Returns the value of attribute transaction_adapter.
-
#validity_check_timeout ⇒ Object
readonly
Returns the value of attribute validity_check_timeout.
Instance Method Summary collapse
- #action_logger ⇒ Object
-
#auto_execute ⇒ Object
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating).
- #before_termination(&block) ⇒ Object
- #delay(action_class, delay_options, *args) ⇒ Object
- #delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object
- #event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
-
#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture
raises when ExecutionPlan is not accepted for execution.
- #get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
-
#initialize(config) ⇒ World
constructor
A new instance of World.
- #logger ⇒ Object
- #ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #plan(action_class, *args) ⇒ Object
- #plan_elsewhere(action_class, *args) ⇒ Object
- #plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
- #plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object
-
#post_initialization ⇒ Object
performs steps once the executor is ready and invalidation of previous worls is finished.
- #publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object
- #registered_world ⇒ Object
-
#reload! ⇒ Object
reload actions classes, intended only for devel.
- #subscribed_actions(action_class) ⇒ Object
- #terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
- #terminating? ⇒ Boolean
-
#trigger(action_class = nil, *args, &block) ⇒ TriggerResult
blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block.
- #try_spawn(what, lock_class = nil) ⇒ Object
- #update_register ⇒ Object
Methods included from Invalidation
#invalidate, #invalidate_execution_lock, #invalidate_planning_lock, #locks_validity_check, #perform_validity_checks, #with_valid_execution_plan_for_lock, #worlds_validity_check
Constructor Details
#initialize(config) ⇒ World
Returns a new instance of World.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/dynflow/world.rb', line 17 def initialize(config) @config = Config::ForWorld.new(config, self) # Set the telemetry instance as soon as possible Dynflow::Telemetry.set_adapter @config.telemetry_adapter Dynflow::Telemetry.register_metrics! @id = SecureRandom.uuid @logger_adapter = @config.logger_adapter @clock = spawn_and_wait(Clock, 'clock', logger) @config.validate @transaction_adapter = @config.transaction_adapter @persistence = Persistence.new(self, @config.persistence_adapter, :backup_deleted_plans => @config.backup_deleted_plans, :backup_dir => @config.backup_dir) @coordinator = Coordinator.new(@config.coordinator_adapter) if @config.executor @executor = Executors::Parallel.new(self, executor_class: @config.executor, heartbeat_interval: @config.executor_heartbeat_interval, queues_options: @config.queues) end @action_classes = @config.action_classes @auto_rescue = @config.auto_rescue @exit_on_terminate = Concurrent::AtomicBoolean.new(@config.exit_on_terminate) @connector = @config.connector @middleware = Middleware::World.new @middleware.use Middleware::Common::Transaction if @transaction_adapter @client_dispatcher = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age) @dead_letter_handler = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers) @auto_validity_check = @config.auto_validity_check @validity_check_timeout = @config.validity_check_timeout @throttle_limiter = @config.throttle_limiter @terminated = Concurrent::Promises.resolvable_event @termination_timeout = @config.termination_timeout calculate_subscription_index if executor @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore) executor.initialized.wait end update_register perform_validity_checks if auto_validity_check = Mutex.new @before_termination_hooks = Queue.new if @config.auto_terminate at_exit do @exit_on_terminate.make_false # make sure we don't terminate twice self.terminate.wait end end post_initialization end |
Instance Attribute Details
#action_classes ⇒ Object (readonly)
Returns the value of attribute action_classes.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def action_classes @action_classes end |
#auto_rescue ⇒ Object (readonly)
Returns the value of attribute auto_rescue.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def auto_rescue @auto_rescue end |
#auto_validity_check ⇒ Object (readonly)
Returns the value of attribute auto_validity_check.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def auto_validity_check @auto_validity_check end |
#client_dispatcher ⇒ Object (readonly)
Returns the value of attribute client_dispatcher.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def client_dispatcher @client_dispatcher end |
#clock ⇒ Object (readonly)
Returns the value of attribute clock.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def clock @clock end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def config @config end |
#connector ⇒ Object (readonly)
Returns the value of attribute connector.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def connector @connector end |
#coordinator ⇒ Object (readonly)
Returns the value of attribute coordinator.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def coordinator @coordinator end |
#dead_letter_handler ⇒ Object (readonly)
Returns the value of attribute dead_letter_handler.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def dead_letter_handler @dead_letter_handler end |
#delayed_executor ⇒ Object (readonly)
Returns the value of attribute delayed_executor.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def delayed_executor @delayed_executor end |
#execution_plan_cleaner ⇒ Object (readonly)
Returns the value of attribute execution_plan_cleaner.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def execution_plan_cleaner @execution_plan_cleaner end |
#executor ⇒ Object (readonly)
Returns the value of attribute executor.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def executor @executor end |
#executor_dispatcher ⇒ Object (readonly)
Returns the value of attribute executor_dispatcher.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def executor_dispatcher @executor_dispatcher end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def id @id end |
#logger_adapter ⇒ Object (readonly)
Returns the value of attribute logger_adapter.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def logger_adapter @logger_adapter end |
#meta ⇒ Object (readonly)
Returns the value of attribute meta.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def end |
#middleware ⇒ Object (readonly)
Returns the value of attribute middleware.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def middleware @middleware end |
#persistence ⇒ Object (readonly)
Returns the value of attribute persistence.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def persistence @persistence end |
#subscription_index ⇒ Object (readonly)
Returns the value of attribute subscription_index.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def subscription_index @subscription_index end |
#terminated ⇒ Object (readonly)
Returns the value of attribute terminated.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def terminated @terminated end |
#termination_timeout ⇒ Object (readonly)
Returns the value of attribute termination_timeout.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def termination_timeout @termination_timeout end |
#throttle_limiter ⇒ Object (readonly)
Returns the value of attribute throttle_limiter.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def throttle_limiter @throttle_limiter end |
#transaction_adapter ⇒ Object (readonly)
Returns the value of attribute transaction_adapter.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def transaction_adapter @transaction_adapter end |
#validity_check_timeout ⇒ Object (readonly)
Returns the value of attribute validity_check_timeout.
11 12 13 |
# File 'lib/dynflow/world.rb', line 11 def validity_check_timeout @validity_check_timeout end |
Instance Method Details
#action_logger ⇒ Object
114 115 116 |
# File 'lib/dynflow/world.rb', line 114 def action_logger logger_adapter.action_logger end |
#auto_execute ⇒ Object
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating)
278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/dynflow/world.rb', line 278 def auto_execute coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do planned_execution_plans = self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) } planned_execution_plans.map do |ep| if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty? execute(ep.id) end end.compact end rescue Coordinator::LockError => e logger.info "auto-executor lock already aquired: #{e.message}" [] end |
#before_termination(&block) ⇒ Object
84 85 86 |
# File 'lib/dynflow/world.rb', line 84 def before_termination(&block) @before_termination_hooks << block end |
#delay(action_class, delay_options, *args) ⇒ Object
192 193 194 |
# File 'lib/dynflow/world.rb', line 192 def delay(action_class, , *args) (action_class: action_class, args: args, delay_options: ) end |
#delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object
196 197 198 199 200 201 |
# File 'lib/dynflow/world.rb', line 196 def (action_class:, args:, delay_options:, id: nil, caller_action: nil) raise 'No action_class given' if action_class.nil? execution_plan = ExecutionPlan.new(self, id) execution_plan.delay(caller_action, action_class, , *args) Scheduled[execution_plan.id] end |
#event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
230 231 232 |
# File 'lib/dynflow/world.rb', line 230 def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false) end |
#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture
raises when ExecutionPlan is not accepted for execution
226 227 228 |
# File 'lib/dynflow/world.rb', line 226 def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Execution[execution_plan_id], done, true) end |
#get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
250 251 252 |
# File 'lib/dynflow/world.rb', line 250 def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end |
#logger ⇒ Object
110 111 112 |
# File 'lib/dynflow/world.rb', line 110 def logger logger_adapter.dynflow_logger end |
#ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
242 243 244 |
# File 'lib/dynflow/world.rb', line 242 def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, true], done, false, timeout) end |
#ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
246 247 248 |
# File 'lib/dynflow/world.rb', line 246 def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, false], done, false, timeout) end |
#plan(action_class, *args) ⇒ Object
211 212 213 |
# File 'lib/dynflow/world.rb', line 211 def plan(action_class, *args) (action_class: action_class, args: args) end |
#plan_elsewhere(action_class, *args) ⇒ Object
203 204 205 206 207 208 209 |
# File 'lib/dynflow/world.rb', line 203 def plan_elsewhere(action_class, *args) execution_plan = ExecutionPlan.new(self, nil) execution_plan.delay(nil, action_class, {}, *args) plan_request(execution_plan.id) Scheduled[execution_plan.id] end |
#plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
234 235 236 |
# File 'lib/dynflow/world.rb', line 234 def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false) end |
#plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object
238 239 240 |
# File 'lib/dynflow/world.rb', line 238 def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Planning[execution_plan_id], done, false) end |
#plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object
215 216 217 218 219 220 221 222 |
# File 'lib/dynflow/world.rb', line 215 def (action_class:, args:, id: nil, caller_action: nil) ExecutionPlan.new(self, id).tap do |execution_plan| coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do execution_plan.prepare(action_class, caller_action: caller_action) execution_plan.plan(*args) end end end |
#post_initialization ⇒ Object
performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check if false, as it should be called after ‘perform_validity_checks` method)
76 77 78 79 80 81 82 |
# File 'lib/dynflow/world.rb', line 76 def post_initialization @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock) @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock) update_register @delayed_executor.start if auto_validity_check && @delayed_executor && !@delayed_executor.started? self.auto_execute if @config.auto_execute end |
#publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object
254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/dynflow/world.rb', line 254 def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason| done.reject reason if reason end client_dispatcher.ask([:publish_request, done, request, timeout], accepted) accepted.wait if wait_for_accepted done rescue => e accepted.reject e end |
#registered_world ⇒ Object
102 103 104 105 106 107 108 |
# File 'lib/dynflow/world.rb', line 102 def registered_world if executor Coordinator::ExecutorWorld.new(self) else Coordinator::ClientWorld.new(self) end end |
#reload! ⇒ Object
reload actions classes, intended only for devel
123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/dynflow/world.rb', line 123 def reload! # TODO what happens with newly loaded classes @action_classes = @action_classes.map do |klass| begin Utils.constantize(klass.to_s) rescue NameError nil # ignore missing classes end end.compact middleware.clear_cache! calculate_subscription_index end |
#subscribed_actions(action_class) ⇒ Object
118 119 120 |
# File 'lib/dynflow/world.rb', line 118 def subscribed_actions(action_class) @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : [] end |
#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
266 267 268 269 |
# File 'lib/dynflow/world.rb', line 266 def terminate(future = Concurrent::Promises.resolvable_future) start_termination.tangle(future) future end |
#terminating? ⇒ Boolean
271 272 273 |
# File 'lib/dynflow/world.rb', line 271 def terminating? defined?(@terminating) end |
#trigger(action_class = nil, *args, &block) ⇒ TriggerResult
blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/dynflow/world.rb', line 175 def trigger(action_class = nil, *args, &block) if action_class.nil? raise 'Neither action_class nor a block given' if block.nil? execution_plan = block.call(self) else execution_plan = plan(action_class, *args) end planned = execution_plan.state == :planned if planned done = execute(execution_plan.id, Concurrent::Promises.resolvable_future) Triggered[execution_plan.id, done] else PlaningFailed[execution_plan.id, execution_plan.errors.first.exception] end end |
#try_spawn(what, lock_class = nil) ⇒ Object
293 294 295 296 297 298 299 300 301 302 |
# File 'lib/dynflow/world.rb', line 293 def try_spawn(what, lock_class = nil) object = nil return nil if !executor || (object = @config.public_send(what)).nil? coordinator.acquire(lock_class.new(self)) if lock_class object.spawn.wait object rescue Coordinator::LockError => e nil end |
#update_register ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/dynflow/world.rb', line 88 def update_register ||= @config. ['queues'] = @config.queues if @executor ['delayed_executor'] = true if @delayed_executor ['execution_plan_cleaner'] = true if @execution_plan_cleaner ['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time if @already_registered coordinator.update_record(registered_world) else coordinator.register_world(registered_world) @already_registered = true end end |