Class: Dynflow::World

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/world.rb

Direct Known Subclasses

SimpleWorld

Defined Under Namespace

Modules: TriggerResult, Triggered

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options_hash = {}) ⇒ World

Returns a new instance of World.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/dynflow/world.rb', line 8

def initialize(options_hash = {})
  @options             = default_options.merge options_hash
  @logger_adapter      = Type! option_val(:logger_adapter), LoggerAdapters::Abstract
  @transaction_adapter = Type! option_val(:transaction_adapter), TransactionAdapters::Abstract
  persistence_adapter  = Type! option_val(:persistence_adapter), PersistenceAdapters::Abstract
  @persistence         = Persistence.new(self, persistence_adapter)
  @executor            = Type! option_val(:executor), Executors::Abstract
  @action_classes      = option_val(:action_classes)
  @auto_rescue         = option_val(:auto_rescue)
  @exit_on_terminate   = option_val(:exit_on_terminate)
  @middleware          = Middleware::World.new
  calculate_subscription_index

  executor.initialized.wait
  @termination_barrier = Mutex.new
  @clock_barrier       = Mutex.new

  transaction_adapter.check self
end

Instance Attribute Details

#action_classesObject (readonly)

Returns the value of attribute action_classes.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def action_classes
  @action_classes
end

#auto_rescueObject (readonly)

Returns the value of attribute auto_rescue.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def auto_rescue
  @auto_rescue
end

#executorObject (readonly)

Returns the value of attribute executor.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def executor
  @executor
end

#logger_adapterObject (readonly)

Returns the value of attribute logger_adapter.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def logger_adapter
  @logger_adapter
end

#middlewareObject (readonly)

Returns the value of attribute middleware.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def middleware
  @middleware
end

#optionsObject (readonly)

Returns the value of attribute options.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def options
  @options
end

#persistenceObject (readonly)

Returns the value of attribute persistence.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def persistence
  @persistence
end

#subscription_indexObject (readonly)

Returns the value of attribute subscription_index.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def subscription_index
  @subscription_index
end

#transaction_adapterObject (readonly)

Returns the value of attribute transaction_adapter.



5
6
7
# File 'lib/dynflow/world.rb', line 5

def transaction_adapter
  @transaction_adapter
end

Instance Method Details

#action_loggerObject



45
46
47
# File 'lib/dynflow/world.rb', line 45

def action_logger
  logger_adapter.action_logger
end

#clockObject



37
38
39
# File 'lib/dynflow/world.rb', line 37

def clock
  @clock_barrier.synchronize { @clock ||= Clock.new(logger) }
end

#consistency_checkObject

Detects execution plans that are marked as running but no executor handles them (probably result of non-standard executor termination)

The current implementation expects no execution_plan being actually run by the executor.

TODO: persist the running executors in the system, so that we can detect the orphaned execution plans. The register should be managable by the console, so that the administrator can unregister dead executors when needed. After the executor is unregistered, the consistency check should be performed to fix the orphaned plans as well.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/dynflow/world.rb', line 152

def consistency_check
  abnormal_execution_plans =
      self.persistence.find_execution_plans filters: { 'state' => %w(planning running) }
  if abnormal_execution_plans.empty?
    logger.info 'Clean start.'
  else
    format_str = '%36s %10s %10s'
    message    = ['Abnormal execution plans, process was probably killed.',
                  'Following ExecutionPlans will be set to paused, ',
                  'it should be fixed manually by administrator.',
                  (format format_str, 'ExecutionPlan', 'state', 'result'),
                  *(abnormal_execution_plans.map do |ep|
                    format format_str, ep.id, ep.state, ep.result
                  end)]

    logger.error message.join("\n")

    abnormal_execution_plans.each do |ep|
      ep.update_state case ep.state
                      when :planning
                        :stopped
                      when :running
                        :paused
                      else
                        raise
                      end
      ep.steps.values.each do |step|
        if [:suspended, :running].include?(step.state)
          step.error = ExecutionPlan::Steps::Error.new("Abnormal termination (previous state: #{step.state})")
          step.state = :error
          step.save
        end
      end
    end
  end
end

#default_optionsObject



28
29
30
31
32
33
34
35
# File 'lib/dynflow/world.rb', line 28

def default_options
  @default_options ||=
      { action_classes:    Action.all_children,
        logger_adapter:    LoggerAdapters::Simple.new,
        executor:          -> world { Executors::Parallel.new(world, options[:pool_size]) },
        exit_on_terminate: true,
        auto_rescue:       true }
end

#event(execution_plan_id, step_id, event, future = Future.new) ⇒ Object



109
110
111
# File 'lib/dynflow/world.rb', line 109

def event(execution_plan_id, step_id, event, future = Future.new)
  executor.event execution_plan_id, step_id, event, future
end

#execute(execution_plan_id, finished = Future.new) ⇒ Future

raises when ExecutionPlan is not accepted for execution

Returns:

  • (Future)

    containing execution_plan when finished



122
123
124
# File 'lib/dynflow/world.rb', line 122

def execute(execution_plan_id, finished = Future.new)
  executor.execute execution_plan_id, finished
end

#execute_planned_execution_plansObject

should be called after World is initialized, SimpleWorld does it automatically



190
191
192
193
194
# File 'lib/dynflow/world.rb', line 190

def execute_planned_execution_plans
  planned_execution_plans =
      self.persistence.find_execution_plans filters: { 'state' => %w(planned) }
  planned_execution_plans.each { |ep| execute ep.id }
end

#loggerObject



41
42
43
# File 'lib/dynflow/world.rb', line 41

def logger
  logger_adapter.dynflow_logger
end

#plan(action_class, *args) ⇒ Object



113
114
115
116
117
118
# File 'lib/dynflow/world.rb', line 113

def plan(action_class, *args)
  ExecutionPlan.new(self).tap do |execution_plan|
    execution_plan.prepare(action_class)
    execution_plan.plan(*args)
  end
end

#reload!Object

reload actions classes, intended only for devel



54
55
56
57
58
# File 'lib/dynflow/world.rb', line 54

def reload!
  @action_classes.map! { |klass| klass.to_s.constantize }
  middleware.clear_cache!
  calculate_subscription_index
end

#subscribed_actions(action_class) ⇒ Object



49
50
51
# File 'lib/dynflow/world.rb', line 49

def subscribed_actions(action_class)
  @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : []
end

#terminate(future = Future.new) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/dynflow/world.rb', line 126

def terminate(future = Future.new)
  @termination_barrier.synchronize do
    if @executor_terminated.nil?
      @executor_terminated = Future.new
      @clock_terminated    = Future.new
      executor.terminate(@executor_terminated).
          do_then { clock.ask(MicroActor::Terminate, @clock_terminated) }
      if @exit_on_terminate
        future.do_then { Kernel.exit }
      end
    end
  end
  Future.join([@executor_terminated, @clock_terminated], future)
end

#trigger(action_class, *args) ⇒ TriggerResult

blocks until action_class is planned

Returns:



94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/dynflow/world.rb', line 94

def trigger(action_class, *args)
  execution_plan = plan(action_class, *args)
  planned        = execution_plan.state == :planned

  if planned
    begin
      Triggered[execution_plan.id, execute(execution_plan.id)]
    rescue => exception
      ExecutionFailed[execution_plan.id, exception]
    end
  else
    PlaningFailed[execution_plan.id, execution_plan.errors.first.exception]
  end
end