Class: Dynflow::Testing::InThreadExecutor
Instance Attribute Summary
#logger, #world
Instance Method Summary
collapse
#execution_status, #initialized
Constructor Details
Returns a new instance of InThreadExecutor.
4
5
6
7
8
|
# File 'lib/dynflow/testing/in_thread_executor.rb', line 4
def initialize(world)
@world = world
@director = Director.new(@world)
@work_items = Queue.new
end
|
Instance Method Details
#clock_tick ⇒ Object
36
37
38
|
# File 'lib/dynflow/testing/in_thread_executor.rb', line 36
def clock_tick
@world.clock.progress_all([:periodic_check_inbox])
end
|
#event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future) ⇒ Object
28
29
30
31
32
33
34
|
# File 'lib/dynflow/testing/in_thread_executor.rb', line 28
def event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future)
event = (Director::Event[execution_plan_id, step_id, event, future])
@director.handle_event(event).each do |work_item|
@work_items << work_item
end
future
end
|
#execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true) ⇒ Object
10
11
12
13
14
|
# File 'lib/dynflow/testing/in_thread_executor.rb', line 10
def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true)
feed_queue(@director.start_execution(execution_plan_id, finished))
process_work_items
finished
end
|
#feed_queue(work_items) ⇒ Object
40
41
42
|
# File 'lib/dynflow/testing/in_thread_executor.rb', line 40
def feed_queue(work_items)
work_items.each { |work_item| @work_items.push(work_item) }
end
|
#handle_work(work_item) ⇒ Object
23
24
25
26
|
# File 'lib/dynflow/testing/in_thread_executor.rb', line 23
def handle_work(work_item)
work_item.execute
@director.work_finished(work_item)
end
|
#process_work_items ⇒ Object
16
17
18
19
20
21
|
# File 'lib/dynflow/testing/in_thread_executor.rb', line 16
def process_work_items
until @work_items.empty?
feed_queue(handle_work(@work_items.pop))
clock_tick
end
end
|
#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
44
45
46
47
48
49
|
# File 'lib/dynflow/testing/in_thread_executor.rb', line 44
def terminate(future = Concurrent::Promises.resolvable_future)
@director.terminate
future.fulfill true
rescue => e
future.reject e
end
|