Class: Dynflow::Coordinator

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/coordinator.rb

Defined Under Namespace

Classes: AutoExecuteLock, ClientWorld, DelayedExecutorLock, DuplicateRecordError, ExecutionLock, ExecutionPlanCleanerLock, ExecutorWorld, Lock, LockByWorld, LockError, PlanningLock, Record, SingletonActionLock, WorldInvalidationLock, WorldRecord

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(coordinator_adapter) ⇒ Coordinator

Returns a new instance of Coordinator.



312
313
314
# File 'lib/dynflow/coordinator.rb', line 312

def initialize(coordinator_adapter)
  @adapter = coordinator_adapter
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



310
311
312
# File 'lib/dynflow/coordinator.rb', line 310

def adapter
  @adapter
end

Instance Method Details

#acquire(lock, &block) ⇒ Object



316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/dynflow/coordinator.rb', line 316

def acquire(lock, &block)
  Type! lock, Lock
  lock.validate!
  adapter.create_record(lock)
  if block
    begin
      block.call
    ensure
      release(lock)
    end
  end
rescue DuplicateRecordError => e
  raise LockError.new(e.record)
end

#clean_orphaned_locksObject



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/dynflow/coordinator.rb', line 394

def clean_orphaned_locks
  cleanup_classes = [LockByWorld, SingletonActionLock]
  ret = []
  cleanup_classes.each do |cleanup_class|
    valid_owner_ids = cleanup_class.valid_owner_ids(self)
    valid_classes = cleanup_class.valid_classes.map(&:name)
    orphaned_locks = find_locks(class: valid_classes, exclude_owner_id: valid_owner_ids)
    # reloading the valid owner ids to avoid race conditions
    valid_owner_ids = cleanup_class.valid_owner_ids(self)
    orphaned_locks.each do |lock|
      unless valid_owner_ids.include?(lock.owner_id)
        release(lock)
        ret << lock
      end
    end
  end
  return ret
end

#create_record(record) ⇒ Object



346
347
348
349
# File 'lib/dynflow/coordinator.rb', line 346

def create_record(record)
  Type! record, Record
  adapter.create_record(record)
end

#deactivate_world(world) ⇒ Object



388
389
390
391
392
# File 'lib/dynflow/coordinator.rb', line 388

def deactivate_world(world)
  Type! world, Coordinator::ExecutorWorld
  world.active = false
  update_record(world)
end

#delete_record(record) ⇒ Object



356
357
358
359
# File 'lib/dynflow/coordinator.rb', line 356

def delete_record(record)
  Type! record, Record
  adapter.delete_record(record)
end

#delete_world(world) ⇒ Object



382
383
384
385
386
# File 'lib/dynflow/coordinator.rb', line 382

def delete_world(world)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
  release_by_owner("world:#{world.id}")
  delete_record(world)
end

#find_locks(filter_options) ⇒ Object



340
341
342
343
344
# File 'lib/dynflow/coordinator.rb', line 340

def find_locks(filter_options)
  adapter.find_records(filter_options).map do |lock_data|
    Lock.from_hash(lock_data)
  end
end

#find_records(filter) ⇒ Object



361
362
363
364
365
# File 'lib/dynflow/coordinator.rb', line 361

def find_records(filter)
  adapter.find_records(filter).map do |record_data|
    Record.from_hash(record_data)
  end
end

#find_worlds(active_executor_only = false, filters = {}) ⇒ Object



367
368
369
370
371
372
373
374
375
# File 'lib/dynflow/coordinator.rb', line 367

def find_worlds(active_executor_only = false, filters = {})
  ret = find_records(filters.merge(class: Coordinator::ExecutorWorld.name))
  if active_executor_only
    ret = ret.select(&:active?)
  else
    ret.concat(find_records(filters.merge(class: Coordinator::ClientWorld.name)))
  end
  ret
end

#register_world(world) ⇒ Object



377
378
379
380
# File 'lib/dynflow/coordinator.rb', line 377

def register_world(world)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
  create_record(world)
end

#release(lock) ⇒ Object



331
332
333
334
# File 'lib/dynflow/coordinator.rb', line 331

def release(lock)
  Type! lock, Lock
  adapter.delete_record(lock)
end

#release_by_owner(owner_id) ⇒ Object



336
337
338
# File 'lib/dynflow/coordinator.rb', line 336

def release_by_owner(owner_id)
  find_locks(owner_id: owner_id).map { |lock| release(lock) }
end

#update_record(record) ⇒ Object



351
352
353
354
# File 'lib/dynflow/coordinator.rb', line 351

def update_record(record)
  Type! record, Record
  adapter.update_record(record)
end