Class: Dynflow::Executors::Parallel::Core
Instance Attribute Summary collapse
Instance Method Summary
collapse
#dead_letter_routing, #handle_event, #handle_execution, #handle_persistence_error, #handle_planning, #heartbeat, #plan_events, #work_finished
Methods inherited from Actor
#behaviour_definition, #terminating?
#on_message
#log
Constructor Details
#initialize(world, heartbeat_interval, queues_options) ⇒ Core
Returns a new instance of Core.
11
12
13
14
15
|
# File 'lib/dynflow/executors/parallel/core.rb', line 11
def initialize(world, heartbeat_interval, queues_options)
super
@pools = {}
initialize_queues
end
|
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
9
10
11
|
# File 'lib/dynflow/executors/parallel/core.rb', line 9
def logger
@logger
end
|
Instance Method Details
#execution_status(execution_plan_id = nil) ⇒ Object
39
40
41
42
43
|
# File 'lib/dynflow/executors/parallel/core.rb', line 39
def execution_status(execution_plan_id = nil)
@pools.each_with_object({}) do |(pool_name, pool), hash|
hash[pool_name] = pool.ask!([:execution_status, execution_plan_id])
end
end
|
#feed_pool(work_items) ⇒ Object
45
46
47
48
49
50
|
# File 'lib/dynflow/executors/parallel/core.rb', line 45
def feed_pool(work_items)
work_items.each do |new_work|
new_work.world = @world
@pools.fetch(suggest_queue(new_work)).tell([:schedule_work, new_work])
end
end
|
#finish_termination(pool_name) ⇒ Object
32
33
34
35
36
37
|
# File 'lib/dynflow/executors/parallel/core.rb', line 32
def finish_termination(pool_name)
@pools.delete(pool_name)
return unless @pools.empty?
super()
end
|
#initialize_queues ⇒ Object
17
18
19
20
21
22
23
24
25
|
# File 'lib/dynflow/executors/parallel/core.rb', line 17
def initialize_queues
default_pool_size = @queues_options[:default][:pool_size]
@queues_options.each do |(queue_name, queue_options)|
queue_pool_size = queue_options.fetch(:pool_size, default_pool_size)
@pools[queue_name] = Pool.spawn("pool #{queue_name}", @world,
reference, queue_name, queue_pool_size,
@world.transaction_adapter)
end
end
|
#start_termination(*args) ⇒ Object
27
28
29
30
|
# File 'lib/dynflow/executors/parallel/core.rb', line 27
def start_termination(*args)
super
@pools.values.each { |pool| pool.tell([:start_termination, Concurrent::Promises.resolvable_future]) }
end
|