Class: TestBench::Parallel::Session
- Inherits:
-
Object
- Object
- TestBench::Parallel::Session
show all
- Includes:
- ImportConstants
- Defined in:
- lib/test_bench/parallel/session.rb
Defined Under Namespace
Modules: Defaults
Classes: TelemetrySink
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(session, processes) ⇒ Session
Returns a new instance of Session.
20
21
22
23
|
# File 'lib/test_bench/parallel/session.rb', line 20
def initialize(session, processes)
@session = session
@processes = processes
end
|
Instance Attribute Details
#file_path_queue ⇒ Object
Returns the value of attribute file_path_queue.
12
13
14
|
# File 'lib/test_bench/parallel/session.rb', line 12
def file_path_queue
@file_path_queue
end
|
#pending_file_count ⇒ Object
15
16
17
|
# File 'lib/test_bench/parallel/session.rb', line 15
def pending_file_count
@pending_file_count ||= 0
end
|
#processes ⇒ Object
Returns the value of attribute processes.
9
10
11
|
# File 'lib/test_bench/parallel/session.rb', line 9
def processes
@processes
end
|
#session ⇒ Object
Returns the value of attribute session.
8
9
10
|
# File 'lib/test_bench/parallel/session.rb', line 8
def session
@session
end
|
#telemetry_queue ⇒ Object
Returns the value of attribute telemetry_queue.
13
14
15
|
# File 'lib/test_bench/parallel/session.rb', line 13
def telemetry_queue
@telemetry_queue
end
|
#threads ⇒ Object
Returns the value of attribute threads.
11
12
13
|
# File 'lib/test_bench/parallel/session.rb', line 11
def threads
@threads
end
|
Class Method Details
.build(processes: nil) ⇒ Object
25
26
27
28
29
30
31
|
# File 'lib/test_bench/parallel/session.rb', line 25
def self.build(processes: nil)
processes ||= Defaults.processes
session = self.establish_session
new(session, processes)
end
|
.establish_session ⇒ Object
33
34
35
|
# File 'lib/test_bench/parallel/session.rb', line 33
def self.establish_session
TestBench::Run.establish_session
end
|
Instance Method Details
#close ⇒ Object
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/test_bench/parallel/session.rb', line 57
def close
if file_path_queue.nil?
return
end
file_path_queue.close
until threads.empty?
update
threads.delete_if do |thread|
!thread.alive?
end
end
end
|
#execute(file_path) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/test_bench/parallel/session.rb', line 45
def execute(file_path)
start if not started?
file_path_queue.push(file_path)
self.pending_file_count += 1
until pending_file_count <= threads.count
update
end
end
|
#register_telemetry_sink ⇒ Object
37
38
39
|
# File 'lib/test_bench/parallel/session.rb', line 37
def register_telemetry_sink(...)
session.register_telemetry_sink(...)
end
|
#result ⇒ Object
41
42
43
|
# File 'lib/test_bench/parallel/session.rb', line 41
def result(...)
session.result(...)
end
|
#start ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/test_bench/parallel/session.rb', line 93
def start
self.file_path_queue = Queue.new
self.telemetry_queue = Queue.new
self.threads = processes.times.map do |process|
Thread.new do
session = TestBench::Session.build
telemetry_sink = TelemetrySink.new(telemetry_queue)
session.register_telemetry_sink(telemetry_sink)
while file_path = file_path_queue.pop
session.execute(file_path)
telemetry_sink.flush
end
end
end
end
|
#started? ⇒ Boolean
89
90
91
|
# File 'lib/test_bench/parallel/session.rb', line 89
def started?
!threads.nil?
end
|
#update ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/test_bench/parallel/session.rb', line 73
def update
timeout_milliseconds = 100
timeout_seconds = Rational(timeout_milliseconds, 1_000)
while event_data_batch = telemetry_queue.pop(timeout: timeout_seconds)
event_data_batch.each do |event_data|
session.record_event(event_data)
case event_data
when FileExecuted, FileNotFound
self.pending_file_count -= 1
end
end
end
end
|