Class: Dynflow::Coordinator

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/coordinator.rb

Defined Under Namespace

Classes: AutoExecuteLock, ClientWorld, DelayedExecutorLock, DuplicateRecordError, ExecutionLock, ExecutionPlanCleanerLock, ExecutorWorld, Lock, LockByWorld, LockError, PlanningLock, Record, SingletonActionLock, WorldInvalidationLock, WorldRecord

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(coordinator_adapter) ⇒ Coordinator

Returns a new instance of Coordinator.



316
317
318
# File 'lib/dynflow/coordinator.rb', line 316

def initialize(coordinator_adapter)
  @adapter = coordinator_adapter
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



314
315
316
# File 'lib/dynflow/coordinator.rb', line 314

def adapter
  @adapter
end

Instance Method Details

#acquire(lock, &block) ⇒ Object



320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/dynflow/coordinator.rb', line 320

def acquire(lock, &block)
  Type! lock, Lock
  lock.validate!
  adapter.create_record(lock)
  if block
    begin
      block.call
    ensure
      release(lock)
    end
  end
rescue DuplicateRecordError => e
  raise LockError.new(e.record)
end

#clean_orphaned_locksObject



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/dynflow/coordinator.rb', line 398

def clean_orphaned_locks
  cleanup_classes = [LockByWorld, SingletonActionLock]
  ret = []
  cleanup_classes.each do |cleanup_class|
    valid_owner_ids = cleanup_class.valid_owner_ids(self)
    valid_classes = cleanup_class.valid_classes.map(&:name)
    orphaned_locks = find_locks(class: valid_classes, exclude_owner_id: valid_owner_ids)
    # reloading the valid owner ids to avoid race conditions
    valid_owner_ids = cleanup_class.valid_owner_ids(self)
    orphaned_locks.each do |lock|
      unless valid_owner_ids.include?(lock.owner_id)
        release(lock)
        ret << lock
      end
    end
  end
  return ret
end

#create_record(record) ⇒ Object



350
351
352
353
# File 'lib/dynflow/coordinator.rb', line 350

def create_record(record)
  Type! record, Record
  adapter.create_record(record)
end

#deactivate_world(world) ⇒ Object



392
393
394
395
396
# File 'lib/dynflow/coordinator.rb', line 392

def deactivate_world(world)
  Type! world, Coordinator::ExecutorWorld
  world.active = false
  update_record(world)
end

#delete_record(record) ⇒ Object



360
361
362
363
# File 'lib/dynflow/coordinator.rb', line 360

def delete_record(record)
  Type! record, Record
  adapter.delete_record(record)
end

#delete_world(world) ⇒ Object



386
387
388
389
390
# File 'lib/dynflow/coordinator.rb', line 386

def delete_world(world)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
  release_by_owner("world:#{world.id}")
  delete_record(world)
end

#find_locks(filter_options) ⇒ Object



344
345
346
347
348
# File 'lib/dynflow/coordinator.rb', line 344

def find_locks(filter_options)
  adapter.find_records(filter_options).map do |lock_data|
    Lock.from_hash(lock_data)
  end
end

#find_records(filter) ⇒ Object



365
366
367
368
369
# File 'lib/dynflow/coordinator.rb', line 365

def find_records(filter)
  adapter.find_records(filter).map do |record_data|
    Record.from_hash(record_data)
  end
end

#find_worlds(active_executor_only = false, filters = {}) ⇒ Object



371
372
373
374
375
376
377
378
379
# File 'lib/dynflow/coordinator.rb', line 371

def find_worlds(active_executor_only = false, filters = {})
  ret = find_records(filters.merge(class: Coordinator::ExecutorWorld.name))
  if active_executor_only
    ret = ret.select(&:active?)
  else
    ret.concat(find_records(filters.merge(class: Coordinator::ClientWorld.name)))
  end
  ret
end

#register_world(world) ⇒ Object



381
382
383
384
# File 'lib/dynflow/coordinator.rb', line 381

def register_world(world)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
  create_record(world)
end

#release(lock) ⇒ Object



335
336
337
338
# File 'lib/dynflow/coordinator.rb', line 335

def release(lock)
  Type! lock, Lock
  adapter.delete_record(lock)
end

#release_by_owner(owner_id) ⇒ Object



340
341
342
# File 'lib/dynflow/coordinator.rb', line 340

def release_by_owner(owner_id)
  find_locks(owner_id: owner_id).map { |lock| release(lock) }
end

#update_record(record) ⇒ Object



355
356
357
358
# File 'lib/dynflow/coordinator.rb', line 355

def update_record(record)
  Type! record, Record
  adapter.update_record(record)
end