Class: Dynflow::World

Inherits:
Object
  • Object
show all
Includes:
Algebrick::Matching, Algebrick::TypeCheck, Invalidation
Defined in:
lib/dynflow/world.rb,
lib/dynflow/world/invalidation.rb

Direct Known Subclasses

Testing::InThreadWorld

Defined Under Namespace

Modules: Invalidation, TriggerResult, Triggered

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Invalidation

#invalidate, #invalidate_execution_lock, #invalidate_planning_lock, #locks_validity_check, #perform_validity_checks, #with_valid_execution_plan_for_lock, #worlds_validity_check

Constructor Details

#initialize(config) ⇒ World

Returns a new instance of World.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/dynflow/world.rb', line 17

def initialize(config)
  @config = Config::ForWorld.new(config, self)

  # Set the telemetry instance as soon as possible
  Dynflow::Telemetry.set_adapter @config.telemetry_adapter
  Dynflow::Telemetry.register_metrics!

  @id                     = SecureRandom.uuid
  @logger_adapter         = @config.logger_adapter
  @clock                  = spawn_and_wait(Clock, 'clock', logger)
  @config.validate
  @transaction_adapter    = @config.transaction_adapter
  @persistence            = Persistence.new(self, @config.persistence_adapter,
                                            :backup_deleted_plans => @config.backup_deleted_plans,
                                            :backup_dir => @config.backup_dir)
  @coordinator            = Coordinator.new(@config.coordinator_adapter)
  if @config.executor
    @executor = Executors::Parallel.new(self,
                                        executor_class: @config.executor,
                                        heartbeat_interval: @config.executor_heartbeat_interval,
                                        queues_options: @config.queues)
  end
  @action_classes         = @config.action_classes
  @auto_rescue            = @config.auto_rescue
  @exit_on_terminate      = Concurrent::AtomicBoolean.new(@config.exit_on_terminate)
  @connector              = @config.connector
  @middleware             = Middleware::World.new
  @middleware.use Middleware::Common::Transaction if @transaction_adapter
  @client_dispatcher      = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age)
  @dead_letter_handler    = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers)
  @auto_validity_check    = @config.auto_validity_check
  @validity_check_timeout = @config.validity_check_timeout
  @throttle_limiter       = @config.throttle_limiter
  @terminated             = Concurrent::Promises.resolvable_event
  @termination_timeout    = @config.termination_timeout
  calculate_subscription_index

  if executor
    @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore)
    executor.initialized.wait
  end
  update_register
  perform_validity_checks if auto_validity_check

  @termination_barrier = Mutex.new
  @before_termination_hooks = Queue.new

  if @config.auto_terminate
    at_exit do
      @exit_on_terminate.make_false # make sure we don't terminate twice
      self.terminate.wait
    end
  end
  post_initialization
end

Instance Attribute Details

#action_classesObject (readonly)

Returns the value of attribute action_classes.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def action_classes
  @action_classes
end

#auto_rescueObject (readonly)

Returns the value of attribute auto_rescue.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def auto_rescue
  @auto_rescue
end

#auto_validity_checkObject (readonly)

Returns the value of attribute auto_validity_check.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def auto_validity_check
  @auto_validity_check
end

#client_dispatcherObject (readonly)

Returns the value of attribute client_dispatcher.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def client_dispatcher
  @client_dispatcher
end

#clockObject (readonly)

Returns the value of attribute clock.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def clock
  @clock
end

#configObject (readonly)

Returns the value of attribute config.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def config
  @config
end

#connectorObject (readonly)

Returns the value of attribute connector.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def connector
  @connector
end

#coordinatorObject (readonly)

Returns the value of attribute coordinator.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def coordinator
  @coordinator
end

#dead_letter_handlerObject (readonly)

Returns the value of attribute dead_letter_handler.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def dead_letter_handler
  @dead_letter_handler
end

#delayed_executorObject (readonly)

Returns the value of attribute delayed_executor.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def delayed_executor
  @delayed_executor
end

#execution_plan_cleanerObject (readonly)

Returns the value of attribute execution_plan_cleaner.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def execution_plan_cleaner
  @execution_plan_cleaner
end

#executorObject (readonly)

Returns the value of attribute executor.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def executor
  @executor
end

#executor_dispatcherObject (readonly)

Returns the value of attribute executor_dispatcher.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def executor_dispatcher
  @executor_dispatcher
end

#idObject (readonly)

Returns the value of attribute id.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def id
  @id
end

#logger_adapterObject (readonly)

Returns the value of attribute logger_adapter.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def logger_adapter
  @logger_adapter
end

#metaObject (readonly)

Returns the value of attribute meta.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def meta
  @meta
end

#middlewareObject (readonly)

Returns the value of attribute middleware.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def middleware
  @middleware
end

#persistenceObject (readonly)

Returns the value of attribute persistence.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def persistence
  @persistence
end

#subscription_indexObject (readonly)

Returns the value of attribute subscription_index.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def subscription_index
  @subscription_index
end

#terminatedObject (readonly)

Returns the value of attribute terminated.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def terminated
  @terminated
end

#termination_timeoutObject (readonly)

Returns the value of attribute termination_timeout.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def termination_timeout
  @termination_timeout
end

#throttle_limiterObject (readonly)

Returns the value of attribute throttle_limiter.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def throttle_limiter
  @throttle_limiter
end

#transaction_adapterObject (readonly)

Returns the value of attribute transaction_adapter.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def transaction_adapter
  @transaction_adapter
end

#validity_check_timeoutObject (readonly)

Returns the value of attribute validity_check_timeout.



11
12
13
# File 'lib/dynflow/world.rb', line 11

def validity_check_timeout
  @validity_check_timeout
end

Instance Method Details

#action_loggerObject



114
115
116
# File 'lib/dynflow/world.rb', line 114

def action_logger
  logger_adapter.action_logger
end

#auto_executeObject

24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating)



278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/dynflow/world.rb', line 278

def auto_execute
  coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do
    planned_execution_plans =
        self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) }
    planned_execution_plans.map do |ep|
      if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty?
        execute(ep.id)
      end
    end.compact
  end
rescue Coordinator::LockError => e
  logger.info "auto-executor lock already aquired: #{e.message}"
  []
end

#before_termination(&block) ⇒ Object



84
85
86
# File 'lib/dynflow/world.rb', line 84

def before_termination(&block)
  @before_termination_hooks << block
end

#delay(action_class, delay_options, *args) ⇒ Object



192
193
194
# File 'lib/dynflow/world.rb', line 192

def delay(action_class, delay_options, *args)
  delay_with_options(action_class: action_class, args: args, delay_options: delay_options)
end

#delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object



196
197
198
199
200
201
# File 'lib/dynflow/world.rb', line 196

def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil)
  raise 'No action_class given' if action_class.nil?
  execution_plan = ExecutionPlan.new(self, id)
  execution_plan.delay(caller_action, action_class, delay_options, *args)
  Scheduled[execution_plan.id]
end

#event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object



230
231
232
# File 'lib/dynflow/world.rb', line 230

def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false)
end

#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture

raises when ExecutionPlan is not accepted for execution

Returns:

  • (Concurrent::Promises::ResolvableFuture)

    containing execution_plan when finished



226
227
228
# File 'lib/dynflow/world.rb', line 226

def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Execution[execution_plan_id], done, true)
end

#get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



250
251
252
# File 'lib/dynflow/world.rb', line 250

def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
end

#loggerObject



110
111
112
# File 'lib/dynflow/world.rb', line 110

def logger
  logger_adapter.dynflow_logger
end

#ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



242
243
244
# File 'lib/dynflow/world.rb', line 242

def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Ping[world_id, true], done, false, timeout)
end

#ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



246
247
248
# File 'lib/dynflow/world.rb', line 246

def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Ping[world_id, false], done, false, timeout)
end

#plan(action_class, *args) ⇒ Object



211
212
213
# File 'lib/dynflow/world.rb', line 211

def plan(action_class, *args)
  plan_with_options(action_class: action_class, args: args)
end

#plan_elsewhere(action_class, *args) ⇒ Object



203
204
205
206
207
208
209
# File 'lib/dynflow/world.rb', line 203

def plan_elsewhere(action_class, *args)
  execution_plan = ExecutionPlan.new(self, nil)
  execution_plan.delay(nil, action_class, {}, *args)
  plan_request(execution_plan.id)

  Scheduled[execution_plan.id]
end

#plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object



234
235
236
# File 'lib/dynflow/world.rb', line 234

def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false)
end

#plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object



238
239
240
# File 'lib/dynflow/world.rb', line 238

def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Planning[execution_plan_id], done, false)
end

#plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object



215
216
217
218
219
220
221
222
# File 'lib/dynflow/world.rb', line 215

def plan_with_options(action_class:, args:, id: nil, caller_action: nil)
  ExecutionPlan.new(self, id).tap do |execution_plan|
    coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do
      execution_plan.prepare(action_class, caller_action: caller_action)
      execution_plan.plan(*args)
    end
  end
end

#post_initializationObject

performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check if false, as it should be called after ‘perform_validity_checks` method)



76
77
78
79
80
81
82
# File 'lib/dynflow/world.rb', line 76

def post_initialization
  @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock)
  @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock)
  update_register
  @delayed_executor.start if auto_validity_check && @delayed_executor && !@delayed_executor.started?
  self.auto_execute if @config.auto_execute
end

#publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
# File 'lib/dynflow/world.rb', line 254

def publish_request(request, done, wait_for_accepted, timeout = nil)
  accepted = Concurrent::Promises.resolvable_future
  accepted.rescue do |reason|
    done.reject reason if reason
  end
  client_dispatcher.ask([:publish_request, done, request, timeout], accepted)
  accepted.wait if wait_for_accepted
  done
rescue => e
  accepted.reject e
end

#registered_worldObject



102
103
104
105
106
107
108
# File 'lib/dynflow/world.rb', line 102

def registered_world
  if executor
    Coordinator::ExecutorWorld.new(self)
  else
    Coordinator::ClientWorld.new(self)
  end
end

#reload!Object

reload actions classes, intended only for devel



123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/dynflow/world.rb', line 123

def reload!
  # TODO what happens with newly loaded classes
  @action_classes = @action_classes.map do |klass|
    begin
      Utils.constantize(klass.to_s)
    rescue NameError
      nil # ignore missing classes
    end
  end.compact
  middleware.clear_cache!
  calculate_subscription_index
end

#subscribed_actions(action_class) ⇒ Object



118
119
120
# File 'lib/dynflow/world.rb', line 118

def subscribed_actions(action_class)
  @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : []
end

#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object



266
267
268
269
# File 'lib/dynflow/world.rb', line 266

def terminate(future = Concurrent::Promises.resolvable_future)
  start_termination.tangle(future)
  future
end

#terminating?Boolean

Returns:

  • (Boolean)


271
272
273
# File 'lib/dynflow/world.rb', line 271

def terminating?
  defined?(@terminating)
end

#trigger(action_class = nil, *args, &block) ⇒ TriggerResult

blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block

Returns:



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/dynflow/world.rb', line 175

def trigger(action_class = nil, *args, &block)
  if action_class.nil?
    raise 'Neither action_class nor a block given' if block.nil?
    execution_plan = block.call(self)
  else
    execution_plan = plan(action_class, *args)
  end
  planned = execution_plan.state == :planned

  if planned
    done = execute(execution_plan.id, Concurrent::Promises.resolvable_future)
    Triggered[execution_plan.id, done]
  else
    PlaningFailed[execution_plan.id, execution_plan.errors.first.exception]
  end
end

#try_spawn(what, lock_class = nil) ⇒ Object



293
294
295
296
297
298
299
300
301
302
# File 'lib/dynflow/world.rb', line 293

def try_spawn(what, lock_class = nil)
  object = nil
  return nil if !executor || (object = @config.public_send(what)).nil?

  coordinator.acquire(lock_class.new(self)) if lock_class
  object.spawn.wait
  object
rescue Coordinator::LockError => e
  nil
end

#update_registerObject



88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/dynflow/world.rb', line 88

def update_register
  @meta                     ||= @config.meta
  @meta['queues']           = @config.queues if @executor
  @meta['delayed_executor'] = true if @delayed_executor
  @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner
  @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time
  if @already_registered
    coordinator.update_record(registered_world)
  else
    coordinator.register_world(registered_world)
    @already_registered = true
  end
end