Class: Dynflow::World

Inherits:
Object
  • Object
show all
Includes:
Algebrick::Matching, Algebrick::TypeCheck, Invalidation
Defined in:
lib/dynflow/world.rb,
lib/dynflow/world/invalidation.rb

Direct Known Subclasses

Testing::InThreadWorld

Defined Under Namespace

Modules: Invalidation, TriggerResult, Triggered

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Invalidation

#invalidate, #invalidate_execution_lock, #invalidate_planning_lock, #locks_validity_check, #perform_validity_checks, #with_valid_execution_plan_for_lock, #worlds_validity_check

Constructor Details

#initialize(config) ⇒ World

Returns a new instance of World.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/dynflow/world.rb', line 18

def initialize(config)
  @config = Config::ForWorld.new(config, self)

  # Set the telemetry instance as soon as possible
  Dynflow::Telemetry.set_adapter @config.telemetry_adapter
  Dynflow::Telemetry.register_metrics!

  @id                     = SecureRandom.uuid
  @logger_adapter         = @config.logger_adapter
  @clock                  = spawn_and_wait(Clock, 'clock', logger)
  @config.validate
  @transaction_adapter    = @config.transaction_adapter
  @persistence            = Persistence.new(self, @config.persistence_adapter,
    :backup_deleted_plans => @config.backup_deleted_plans,
    :backup_dir => @config.backup_dir)
  @coordinator            = Coordinator.new(@config.coordinator_adapter)
  if @config.executor
    @executor = Executors::Parallel.new(self,
      executor_class: @config.executor,
      heartbeat_interval: @config.executor_heartbeat_interval,
      queues_options: @config.queues)
  end
  @action_classes         = @config.action_classes
  @auto_rescue            = @config.auto_rescue
  @exit_on_terminate      = Concurrent::AtomicBoolean.new(@config.exit_on_terminate)
  @connector              = @config.connector
  @middleware             = Middleware::World.new
  @middleware.use Middleware::Common::Transaction if @transaction_adapter
  @client_dispatcher      = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age)
  @dead_letter_handler    = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers)
  @auto_validity_check    = @config.auto_validity_check
  @validity_check_timeout = @config.validity_check_timeout
  @throttle_limiter       = @config.throttle_limiter
  @terminated             = Concurrent::Promises.resolvable_event
  @termination_timeout    = @config.termination_timeout
  calculate_subscription_index

  if executor
    @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore)
    executor.initialized.wait
  end
  update_register
  perform_validity_checks if auto_validity_check

  @termination_barrier = Mutex.new
  @before_termination_hooks = Queue.new

  if @config.auto_terminate
    at_exit do
      @exit_on_terminate.make_false # make sure we don't terminate twice
      self.terminate.wait
    end
  end
  post_initialization
end

Instance Attribute Details

#action_classesObject (readonly)

Returns the value of attribute action_classes.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def action_classes
  @action_classes
end

#auto_rescueObject (readonly)

Returns the value of attribute auto_rescue.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def auto_rescue
  @auto_rescue
end

#auto_validity_checkObject (readonly)

Returns the value of attribute auto_validity_check.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def auto_validity_check
  @auto_validity_check
end

#client_dispatcherObject (readonly)

Returns the value of attribute client_dispatcher.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def client_dispatcher
  @client_dispatcher
end

#clockObject (readonly)

Returns the value of attribute clock.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def clock
  @clock
end

#configObject (readonly)

Returns the value of attribute config.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def config
  @config
end

#connectorObject (readonly)

Returns the value of attribute connector.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def connector
  @connector
end

#coordinatorObject (readonly)

Returns the value of attribute coordinator.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def coordinator
  @coordinator
end

#dead_letter_handlerObject (readonly)

Returns the value of attribute dead_letter_handler.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def dead_letter_handler
  @dead_letter_handler
end

#delayed_executorObject (readonly)

Returns the value of attribute delayed_executor.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def delayed_executor
  @delayed_executor
end

#execution_plan_cleanerObject (readonly)

Returns the value of attribute execution_plan_cleaner.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def execution_plan_cleaner
  @execution_plan_cleaner
end

#executorObject (readonly)

Returns the value of attribute executor.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def executor
  @executor
end

#executor_dispatcherObject (readonly)

Returns the value of attribute executor_dispatcher.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def executor_dispatcher
  @executor_dispatcher
end

#idObject (readonly)

Returns the value of attribute id.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def id
  @id
end

#logger_adapterObject (readonly)

Returns the value of attribute logger_adapter.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def logger_adapter
  @logger_adapter
end

#metaObject (readonly)

Returns the value of attribute meta.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def meta
  @meta
end

#middlewareObject (readonly)

Returns the value of attribute middleware.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def middleware
  @middleware
end

#persistenceObject (readonly)

Returns the value of attribute persistence.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def persistence
  @persistence
end

#subscription_indexObject (readonly)

Returns the value of attribute subscription_index.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def subscription_index
  @subscription_index
end

#terminatedObject (readonly)

Returns the value of attribute terminated.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def terminated
  @terminated
end

#termination_timeoutObject (readonly)

Returns the value of attribute termination_timeout.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def termination_timeout
  @termination_timeout
end

#throttle_limiterObject (readonly)

Returns the value of attribute throttle_limiter.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def throttle_limiter
  @throttle_limiter
end

#transaction_adapterObject (readonly)

Returns the value of attribute transaction_adapter.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def transaction_adapter
  @transaction_adapter
end

#validity_check_timeoutObject (readonly)

Returns the value of attribute validity_check_timeout.



12
13
14
# File 'lib/dynflow/world.rb', line 12

def validity_check_timeout
  @validity_check_timeout
end

Instance Method Details

#action_loggerObject



115
116
117
# File 'lib/dynflow/world.rb', line 115

def action_logger
  logger_adapter.action_logger
end

#auto_executeObject

24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating)



279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/dynflow/world.rb', line 279

def auto_execute
  coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do
    planned_execution_plans =
      self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) }
    planned_execution_plans.map do |ep|
      if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty?
        execute(ep.id)
      end
    end.compact
  end
rescue Coordinator::LockError => e
  logger.info "auto-executor lock already aquired: #{e.message}"
  []
end

#before_termination(&block) ⇒ Object



85
86
87
# File 'lib/dynflow/world.rb', line 85

def before_termination(&block)
  @before_termination_hooks << block
end

#delay(action_class, delay_options, *args) ⇒ Object



193
194
195
# File 'lib/dynflow/world.rb', line 193

def delay(action_class, delay_options, *args)
  delay_with_options(action_class: action_class, args: args, delay_options: delay_options)
end

#delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object



197
198
199
200
201
202
# File 'lib/dynflow/world.rb', line 197

def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil)
  raise 'No action_class given' if action_class.nil?
  execution_plan = ExecutionPlan.new(self, id)
  execution_plan.delay(caller_action, action_class, delay_options, *args)
  Scheduled[execution_plan.id]
end

#event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object



231
232
233
# File 'lib/dynflow/world.rb', line 231

def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false)
end

#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture

raises when ExecutionPlan is not accepted for execution

Returns:

  • (Concurrent::Promises::ResolvableFuture)

    containing execution_plan when finished



227
228
229
# File 'lib/dynflow/world.rb', line 227

def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Execution[execution_plan_id], done, true)
end

#get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



251
252
253
# File 'lib/dynflow/world.rb', line 251

def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
end

#loggerObject



111
112
113
# File 'lib/dynflow/world.rb', line 111

def logger
  logger_adapter.dynflow_logger
end

#ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



243
244
245
# File 'lib/dynflow/world.rb', line 243

def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Ping[world_id, true], done, false, timeout)
end

#ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



247
248
249
# File 'lib/dynflow/world.rb', line 247

def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Ping[world_id, false], done, false, timeout)
end

#plan(action_class, *args) ⇒ Object



212
213
214
# File 'lib/dynflow/world.rb', line 212

def plan(action_class, *args)
  plan_with_options(action_class: action_class, args: args)
end

#plan_elsewhere(action_class, *args) ⇒ Object



204
205
206
207
208
209
210
# File 'lib/dynflow/world.rb', line 204

def plan_elsewhere(action_class, *args)
  execution_plan = ExecutionPlan.new(self, nil)
  execution_plan.delay(nil, action_class, {}, *args)
  plan_request(execution_plan.id)

  Scheduled[execution_plan.id]
end

#plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object



235
236
237
# File 'lib/dynflow/world.rb', line 235

def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false)
end

#plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object



239
240
241
# File 'lib/dynflow/world.rb', line 239

def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Planning[execution_plan_id], done, false)
end

#plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object



216
217
218
219
220
221
222
223
# File 'lib/dynflow/world.rb', line 216

def plan_with_options(action_class:, args:, id: nil, caller_action: nil)
  ExecutionPlan.new(self, id).tap do |execution_plan|
    coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do
      execution_plan.prepare(action_class, caller_action: caller_action)
      execution_plan.plan(*args)
    end
  end
end

#post_initializationObject

performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check if false, as it should be called after ‘perform_validity_checks` method)



77
78
79
80
81
82
83
# File 'lib/dynflow/world.rb', line 77

def post_initialization
  @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock)
  @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock)
  update_register
  @delayed_executor.start if auto_validity_check && @delayed_executor && !@delayed_executor.started?
  self.auto_execute if @config.auto_execute
end

#publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object



255
256
257
258
259
260
261
262
263
264
265
# File 'lib/dynflow/world.rb', line 255

def publish_request(request, done, wait_for_accepted, timeout = nil)
  accepted = Concurrent::Promises.resolvable_future
  accepted.rescue do |reason|
    done.reject reason if reason
  end
  client_dispatcher.ask([:publish_request, done, request, timeout], accepted)
  accepted.wait if wait_for_accepted
  done
rescue => e
  accepted.reject e
end

#registered_worldObject



103
104
105
106
107
108
109
# File 'lib/dynflow/world.rb', line 103

def registered_world
  if executor
    Coordinator::ExecutorWorld.new(self)
  else
    Coordinator::ClientWorld.new(self)
  end
end

#reload!Object

reload actions classes, intended only for devel



124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/dynflow/world.rb', line 124

def reload!
  # TODO what happens with newly loaded classes
  @action_classes = @action_classes.map do |klass|
    begin
      Utils.constantize(klass.to_s)
    rescue NameError
      nil # ignore missing classes
    end
  end.compact
  middleware.clear_cache!
  calculate_subscription_index
end

#subscribed_actions(action_class) ⇒ Object



119
120
121
# File 'lib/dynflow/world.rb', line 119

def subscribed_actions(action_class)
  @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : []
end

#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object



267
268
269
270
# File 'lib/dynflow/world.rb', line 267

def terminate(future = Concurrent::Promises.resolvable_future)
  start_termination.tangle(future)
  future
end

#terminating?Boolean

Returns:

  • (Boolean)


272
273
274
# File 'lib/dynflow/world.rb', line 272

def terminating?
  defined?(@terminating)
end

#trigger(action_class = nil, *args, &block) ⇒ TriggerResult

blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block

Returns:



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/dynflow/world.rb', line 176

def trigger(action_class = nil, *args, &block)
  if action_class.nil?
    raise 'Neither action_class nor a block given' if block.nil?
    execution_plan = block.call(self)
  else
    execution_plan = plan(action_class, *args)
  end
  planned = execution_plan.state == :planned

  if planned
    done = execute(execution_plan.id, Concurrent::Promises.resolvable_future)
    Triggered[execution_plan.id, done]
  else
    PlaningFailed[execution_plan.id, execution_plan.errors.first.exception]
  end
end

#try_spawn(what, lock_class = nil) ⇒ Object



294
295
296
297
298
299
300
301
302
303
# File 'lib/dynflow/world.rb', line 294

def try_spawn(what, lock_class = nil)
  object = nil
  return nil if !executor || (object = @config.public_send(what)).nil?

  coordinator.acquire(lock_class.new(self)) if lock_class
  object.spawn.wait
  object
rescue Coordinator::LockError
  nil
end

#update_registerObject



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/dynflow/world.rb', line 89

def update_register
  @meta ||= @config.meta
  @meta['queues']           = @config.queues if @executor
  @meta['delayed_executor'] = true if @delayed_executor
  @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner
  @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time
  if @already_registered
    coordinator.update_record(registered_world)
  else
    coordinator.register_world(registered_world)
    @already_registered = true
  end
end