Class: ChefWorkflow::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
AttrSupport
Includes:
DebugSupport
Defined in:
lib/chef-workflow/support/scheduler.rb

Overview

This is a scheduler for provisioners. It can run in parallel or serial mode, and is dependency-based, that is, it will only schedule items for execution which have all their dependencies satisfied and items that haven’t will wait to execute until that happens.

Constant Summary

Constants included from DebugSupport

DebugSupport::CHEF_WORKFLOW_DEBUG_DEFAULT

Instance Method Summary collapse

Methods included from AttrSupport

fancy_attr

Methods included from DebugSupport

#if_debug

Constructor Details

#initializeScheduler

Constructor. If the first argument is true, will install an ‘at_exit` hook to write out the VM and IP databases.



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/chef-workflow/support/scheduler.rb', line 41

def initialize
  @force_deprovision  = false
  @solved_mutex       = Mutex.new
  @waiters_mutex      = Mutex.new
  @serial             = false
  @solver_thread      = nil
  @working            = { }
  @waiters            = ChefWorkflow::DatabaseSupport::Set.new('vm_scheduler', 'waiters')
  @queue              = Queue.new
  @vm                 = VM.new
end

Instance Method Details

#deprovision_group(group_name, clean_state = true) ⇒ Object

Performs the deprovision of a group by replaying its provision strategy backwards and applying the #shutdown method instead of the #startup method. Removes it from the various state tables if true is set as the second argument, which is the default.



335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/chef-workflow/support/scheduler.rb', line 335

def deprovision_group(group_name, clean_state=true)
  provisioner = vm_groups[group_name]

  # if we can't find the provisioner, we probably got asked to clean up
  # something we never scheduled. Just ignore that.
  if provisioner and ((solved.to_set + vm_working.to_set).include?(group_name) or @force_deprovision)
    if_debug do
      $stderr.puts "Attempting to deprovision group #{group_name}"
    end

    perform_deprovision = lambda do |this_prov|
      result = this_prov.shutdown
      unless result
        if_debug do
          $stderr.puts "Could not deprovision group #{group_name}."
        end
      end
      result
    end

    provisioner.reverse.each do |this_prov|
      if @force_deprovision
        begin
          perform_deprovision.call(this_prov)
        rescue Exception => e
          if_debug do
            $stderr.puts "Deprovision #{this_prov.class.name}/#{group_name} had errors:"
            $stderr.puts "#{e.message}"
          end
        end
      else
        unless perform_deprovision.call(this_prov)
          raise "Could not deprovision #{group_name}/#{this_prov.inspect}"
        end
      end
    end
  end

  if clean_state
    solved.delete(group_name)
    @waiters_mutex.synchronize do
      @waiters.delete(group_name)
    end
    vm_working.delete(group_name)
    vm_dependencies.delete(group_name)
    vm_groups.delete(group_name)
  end
end

#force_deprovisionObject

:attr:

Ignore exceptions while deprovisioning. Default is false.



35
# File 'lib/chef-workflow/support/scheduler.rb', line 35

fancy_attr :force_deprovision

#runObject

Start the scheduler. In serial mode this call will block until the whole dependency graph is satisfied, or one of the provisions fails, at which point an exception will be raised. In parallel mode, this call completes immediately, and you should use #wait_for to control main thread flow.

This call also installs a SIGINFO (Ctrl+T in the terminal on macs) and SIGUSR2 handler which can be used to get information on the status of what’s solved and what’s working.

Immediately returns if in threaded mode and the solver is already running.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/chef-workflow/support/scheduler.rb', line 151

def run
  # short circuit if we're not serial and already running
  return if @solver_thread and !@serial

  handler = lambda do |*args|
    p ["solved:", solved.to_a]
    p ["working:", vm_working.to_a]
    p ["waiting:", @waiters.to_a]
  end

  %w[USR2 INFO].each { |sig| trap(sig, &handler) if Signal.list[sig] }

  queue_runner = lambda do
    run = true

    while run
      service_resolved_waiters

      ready = []

      if @queue.empty?
        if @serial
          return
        else
          with_timeout do
            # this is where most of the execution time is spent, so ensure
            # waiters get considered here.
            service_resolved_waiters
            ready << @queue.shift
          end
        end
      end

      while !@queue.empty?
        ready << @queue.shift
      end

      ready.each do |r|
        if r
          @solved_mutex.synchronize do
            solved.add(r)
            @working.delete(r)
            vm_working.delete(r)
          end
        else
          run = false
        end
      end
    end
  end

  if @serial
    service_resolved_waiters
    queue_runner.call
  else
    @solver_thread = Thread.new do
      with_timeout(false) { service_resolved_waiters }
      queue_runner.call
    end

    # we depend on at_exit hooks being fired, and Thread#abort_on_exception
    # doesn't fire them. This solution bubbles up the exceptions in a similar
    # fashion without actually sacrificing the at_exit functionality.
    Thread.new do
      begin
        @solver_thread.join
      rescue Exception => e
        $stderr.puts "Solver thread encountered an exception:"
        $stderr.puts "#{e.class.name}: #{e.message}"
        $stderr.puts e.backtrace.join("\n")
        Kernel.exit 1
      end
    end
  end
end

#schedule_provision(group_name, provisioner, dependencies = []) ⇒ Object

Schedule a group of VMs for provision. This takes a group name, which is a string, an array of provisioner objects, and a list of string dependencies. If anything in the dependencies list hasn’t been pre-declared, it refuses to continue.

This method will return nil if the server group is already provisioned.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/chef-workflow/support/scheduler.rb', line 89

def schedule_provision(group_name, provisioner, dependencies=[])
  return nil if vm_groups[group_name]
  provisioner = [provisioner] unless provisioner.kind_of?(Array)
  provisioner.each { |x| x.name = group_name }
  vm_groups[group_name] = provisioner

  unless dependencies.all? { |x| vm_groups.has_key?(x) }
    raise "One of your dependencies for #{group_name} has not been pre-declared. Cannot continue"
  end

  vm_dependencies[group_name] = dependencies.to_set
  @waiters_mutex.synchronize do
    @waiters.add(group_name)
  end
end

#serialObject

:attr:

Turn serial mode on (off by default). This forces the scheduler to execute every provision in order, even if it could handle multiple provisions at the same time.



27
# File 'lib/chef-workflow/support/scheduler.rb', line 27

fancy_attr :serial

#service_resolved_waitersObject

This method determines what ‘waiters’, or provisioners that cannot provision yet because of unresolved dependencies, can be executed.



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/chef-workflow/support/scheduler.rb', line 245

def service_resolved_waiters
  @waiters_mutex.synchronize do
    @waiters.replace(@waiters.to_set - (@working.keys.to_set + solved))
  end

  waiter_iteration = lambda do
    @waiters.each do |group_name|
      if (solved.to_set & vm_dependencies[group_name]).to_a == vm_dependencies[group_name]
        if_debug do
          $stderr.puts "Provisioning #{group_name}"
        end

        provisioner = vm_groups[group_name]

        provision_block = lambda do
          # FIXME maybe a way to specify initial args?
          args = nil
          provisioner.each do |this_prov|
            vm_groups[group_name] = provisioner # force a write to the db
            unless args = this_prov.startup(args)
              $stderr.puts "Could not provision #{group_name} with provisioner #{this_prov.class.name}"
              raise "Could not provision #{group_name} with provisioner #{this_prov.class.name}"
            end
          end
          @queue << group_name
        end

        vm_working.add(group_name)

        if @serial
          # HACK: just give the working check something that will always work.
          #       Probably should just mock it.
          @working[group_name] = Thread.new { sleep }
          provision_block.call
        else
          @working[group_name] = Thread.new(&provision_block)
        end
      end
    end
  end

  if @serial
    waiter_iteration.call
  else
    @waiters_mutex.synchronize(&waiter_iteration)
  end
end

#solvedObject

Helper to assist with dealing with a VM object



56
57
58
# File 'lib/chef-workflow/support/scheduler.rb', line 56

def solved
  @vm.provisioned
end

#stopObject

Instructs the scheduler to stop. Note that this is not an interrupt, and the queue will still be exhausted before terminating.



231
232
233
234
235
236
237
238
239
# File 'lib/chef-workflow/support/scheduler.rb', line 231

def stop
  if @serial
    @queue << nil
  else
    @working.values.map { |v| v.join rescue nil }
    @queue << nil
    @solver_thread.join rescue nil
  end
end

#teardown(exceptions = []) ⇒ Object

Instruct all provisioners except ones in the exception list to tear down. Calls #stop as its first action.

This is always done serially. For sanity.



390
391
392
393
394
395
396
# File 'lib/chef-workflow/support/scheduler.rb', line 390

def teardown(exceptions=[])
  stop

  (vm_groups.keys.to_set - exceptions.to_set).each do |group_name|
    deprovision_group(group_name) # clean this after everything finishes
  end
end

#teardown_group(group_name, wait = true) ⇒ Object

Teardown a single group – modifies the solved formula. Be careful to resupply dependencies if you use this, as nothing will resolve until you resupply it.

This takes an optional argument to wait for the group to be solved before attempting to tear it down. Setting this to false effectively says, “I know what I’m doing”, and you should feel bad if you file an issue because you supplied it.



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/chef-workflow/support/scheduler.rb', line 304

def teardown_group(group_name, wait=true)
  wait_for(group_name) if wait

  dependent_items = vm_dependencies.partition { |k,v| v.include?(group_name) }.first.map(&:first)

  if_debug do
    if dependent_items.length > 0
      $stderr.puts "Trying to terminate #{group_name}, found #{dependent_items.inspect} depending on it"
    end
  end

  @solved_mutex.synchronize do
    dependent_and_working = @working.keys & dependent_items

    if dependent_and_working.count > 0
      $stderr.puts "#{dependent_and_working.inspect} are depending on #{group_name}, which you are trying to deprovision."
      $stderr.puts "We can't resolve this problem for you, and future converges may fail during this run that would otherwise work."
      $stderr.puts "Consider using wait_for to better control the dependencies, or turning serial provisioning on."
    end

    deprovision_group(group_name)
  end

end

#vm_dependenciesObject

Helper to assist with dealing with a VM object



70
71
72
# File 'lib/chef-workflow/support/scheduler.rb', line 70

def vm_dependencies
  @vm.dependencies
end

#vm_groupsObject

Helper to assist with dealing with a VM object



63
64
65
# File 'lib/chef-workflow/support/scheduler.rb', line 63

def vm_groups
  @vm.groups
end

#vm_workingObject

Helper to assist with dealing with a VM object



77
78
79
# File 'lib/chef-workflow/support/scheduler.rb', line 77

def vm_working
  @vm.working
end

#wait_for(*dependencies) ⇒ Object

Sleep until this list of dependencies are resolved. In parallel mode, will raise if an exeception occurred while waiting for these resources. In serial mode, wait_for just returns nil.



110
111
112
113
114
115
116
117
118
119
# File 'lib/chef-workflow/support/scheduler.rb', line 110

def wait_for(*dependencies)
  return nil if @serial
  return nil if dependencies.empty?

  dep_set = dependencies.to_set
  until dep_set & solved == dep_set
    sleep 1
    @solver_thread.join unless @solver_thread.alive?
  end
end

#with_timeout(do_loop = true) ⇒ Object

Helper method for scheduling. Wraps items in a timeout and immediately checks all running workers for exceptions, which are immediately bubbled up if there are any. If do_loop is true, it will retry the timeout.



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/chef-workflow/support/scheduler.rb', line 126

def with_timeout(do_loop=true)
  Timeout.timeout(10) do
    dead_working = @working.values.reject(&:alive?)
    if dead_working.size > 0
      dead_working.map(&:join)
    end

    yield
  end
rescue TimeoutError
  retry if do_loop
end