Class: Dynflow::Coordinator
- Inherits:
-
Object
- Object
- Dynflow::Coordinator
show all
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/coordinator.rb
Defined Under Namespace
Classes: AutoExecuteLock, ClientWorld, DelayedExecutorLock, DuplicateRecordError, ExecutionLock, ExecutionPlanCleanerLock, ExecutorWorld, Lock, LockByWorld, LockError, PlanningLock, Record, SingletonActionLock, WorldInvalidationLock, WorldRecord
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(coordinator_adapter) ⇒ Coordinator
Returns a new instance of Coordinator.
316
317
318
|
# File 'lib/dynflow/coordinator.rb', line 316
def initialize(coordinator_adapter)
@adapter = coordinator_adapter
end
|
Instance Attribute Details
#adapter ⇒ Object
Returns the value of attribute adapter.
314
315
316
|
# File 'lib/dynflow/coordinator.rb', line 314
def adapter
@adapter
end
|
Instance Method Details
#acquire(lock, &block) ⇒ Object
320
321
322
323
324
325
326
327
328
329
330
331
332
333
|
# File 'lib/dynflow/coordinator.rb', line 320
def acquire(lock, &block)
Type! lock, Lock
lock.validate!
adapter.create_record(lock)
if block
begin
block.call
ensure
release(lock)
end
end
rescue DuplicateRecordError => e
raise LockError.new(e.record)
end
|
#clean_orphaned_locks ⇒ Object
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
|
# File 'lib/dynflow/coordinator.rb', line 398
def clean_orphaned_locks
cleanup_classes = [LockByWorld, SingletonActionLock]
ret = []
cleanup_classes.each do |cleanup_class|
valid_owner_ids = cleanup_class.valid_owner_ids(self)
valid_classes = cleanup_class.valid_classes.map(&:name)
orphaned_locks = find_locks(class: valid_classes, exclude_owner_id: valid_owner_ids)
valid_owner_ids = cleanup_class.valid_owner_ids(self)
orphaned_locks.each do |lock|
unless valid_owner_ids.include?(lock.owner_id)
release(lock)
ret << lock
end
end
end
return ret
end
|
#create_record(record) ⇒ Object
350
351
352
353
|
# File 'lib/dynflow/coordinator.rb', line 350
def create_record(record)
Type! record, Record
adapter.create_record(record)
end
|
#deactivate_world(world) ⇒ Object
392
393
394
395
396
|
# File 'lib/dynflow/coordinator.rb', line 392
def deactivate_world(world)
Type! world, Coordinator::ExecutorWorld
world.active = false
update_record(world)
end
|
#delete_record(record) ⇒ Object
360
361
362
363
|
# File 'lib/dynflow/coordinator.rb', line 360
def delete_record(record)
Type! record, Record
adapter.delete_record(record)
end
|
#delete_world(world) ⇒ Object
#find_locks(filter_options) ⇒ Object
344
345
346
347
348
|
# File 'lib/dynflow/coordinator.rb', line 344
def find_locks(filter_options)
adapter.find_records(filter_options).map do |lock_data|
Lock.from_hash(lock_data)
end
end
|
#find_records(filter) ⇒ Object
365
366
367
368
369
|
# File 'lib/dynflow/coordinator.rb', line 365
def find_records(filter)
adapter.find_records(filter).map do |record_data|
Record.from_hash(record_data)
end
end
|
#find_worlds(active_executor_only = false, filters = {}) ⇒ Object
371
372
373
374
375
376
377
378
379
|
# File 'lib/dynflow/coordinator.rb', line 371
def find_worlds(active_executor_only = false, filters = {})
ret = find_records(filters.merge(class: Coordinator::ExecutorWorld.name))
if active_executor_only
ret = ret.select(&:active?)
else
ret.concat(find_records(filters.merge(class: Coordinator::ClientWorld.name)))
end
ret
end
|
#register_world(world) ⇒ Object
#release(lock) ⇒ Object
335
336
337
338
|
# File 'lib/dynflow/coordinator.rb', line 335
def release(lock)
Type! lock, Lock
adapter.delete_record(lock)
end
|
#release_by_owner(owner_id) ⇒ Object
340
341
342
|
# File 'lib/dynflow/coordinator.rb', line 340
def release_by_owner(owner_id)
find_locks(owner_id: owner_id).map { |lock| release(lock) }
end
|
#update_record(record) ⇒ Object
355
356
357
358
|
# File 'lib/dynflow/coordinator.rb', line 355
def update_record(record)
Type! record, Record
adapter.update_record(record)
end
|