Class: Dynflow::Director

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/director.rb,
lib/dynflow/director/queue_hash.rb,
lib/dynflow/director/flow_manager.rb,
lib/dynflow/director/sequence_cursor.rb,
lib/dynflow/director/sequential_manager.rb,
lib/dynflow/director/running_steps_manager.rb,
lib/dynflow/director/execution_plan_manager.rb

Overview

Director is responsible for telling what to do next when:

* new execution starts
* an event accurs
* some work is finished

It’s public methods (except terminate) return work items that the executor should understand

Defined Under Namespace

Classes: EventWorkItem, ExecutionPlanManager, FinalizeWorkItem, FlowManager, PlanningWorkItem, QueueHash, RunningStepsManager, SequenceCursor, SequentialManager, StepWorkItem, WorkItem

Constant Summary collapse

Event =
Algebrick.type do
  fields! request_id:        String,
          execution_plan_id: String,
          step_id:           Integer,
          event:             Object,
          result:            Concurrent::Promises::ResolvableFuture,
          optional:          Algebrick::Types::Boolean
end
UnprocessableEvent =
Class.new(Dynflow::Error)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(world) ⇒ Director

Returns a new instance of Director.



165
166
167
168
169
170
171
# File 'lib/dynflow/director.rb', line 165

def initialize(world)
  @world = world
  @logger = world.logger
  @execution_plan_managers = {}
  @rescued_steps = {}
  @planning_plans = Set.new
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



163
164
165
# File 'lib/dynflow/director.rb', line 163

def logger
  @logger
end

Instance Method Details

#current_execution_plan_idsObject



173
174
175
# File 'lib/dynflow/director.rb', line 173

def current_execution_plan_ids
  @execution_plan_managers.keys
end

#handle_event(event) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/dynflow/director.rb', line 190

def handle_event(event)
  Type! event, Event
  execution_plan_manager = @execution_plan_managers[event.execution_plan_id]
  if execution_plan_manager
    execution_plan_manager.event(event)
  elsif event.optional
    event.result.reject "no manager for #{event.inspect}"
    []
  else
    raise Dynflow::Error, "no manager for #{event.inspect}"
  end
rescue Dynflow::Error => e
  event.result.reject e.message
  raise e
end

#handle_planning(execution_plan_uuid) ⇒ Object



177
178
179
180
181
182
# File 'lib/dynflow/director.rb', line 177

def handle_planning(execution_plan_uuid)
  return [] if @planning_plans.include? execution_plan_uuid

  @planning_plans << execution_plan_uuid
  [PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)]
end

#start_execution(execution_plan_id, finished) ⇒ Object



184
185
186
187
188
# File 'lib/dynflow/director.rb', line 184

def start_execution(execution_plan_id, finished)
  manager = track_execution_plan(execution_plan_id, finished)
  return [] unless manager
  unless_done(manager, manager.start)
end

#terminateObject



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/dynflow/director.rb', line 232

def terminate
  unless @execution_plan_managers.empty?
    logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..."
    begin
      @execution_plan_managers.values.each do |manager|
        manager.terminate
      end
    rescue Errors::PersistenceError
      logger.error "could not to clean the data properly"
    end
    @execution_plan_managers.values.each do |manager|
      finish_manager(manager)
    end
  end
end

#work_failed(work) ⇒ Object

called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)



223
224
225
226
227
228
229
230
# File 'lib/dynflow/director.rb', line 223

def work_failed(work)
  if (manager = @execution_plan_managers[work.execution_plan_id])
    manager.terminate
    # Don't try to store when the execution plan went missing
    plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty?
    finish_manager(manager, store: !plan_missing)
  end
end

#work_finished(work) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/dynflow/director.rb', line 206

def work_finished(work)
  case work
  when PlanningWorkItem
    @planning_plans.delete(work.execution_plan_id)
    @world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id)
    []
  else
    manager = @execution_plan_managers[work.execution_plan_id]
    return [] unless manager # skip case when getting event from execution plan that is not running anymore
    unless_done(manager, manager.what_is_next(work))
  end
end