Module: Mercury::TestUtils
- Includes:
- Cps::Methods
- Defined in:
- lib/mercury/test_utils.rb
Instance Method Summary
collapse
#cps, #lift, #seq, #seql, #seqp
Instance Method Details
#amq_filter(xs) ⇒ Object
85
86
87
|
# File 'lib/mercury/test_utils.rb', line 85
def amq_filter(xs)
xs.reject{|x| x.start_with?('amq.')}
end
|
#cps_benchmark(label, &block) ⇒ Object
77
78
79
80
81
82
83
|
# File 'lib/mercury/test_utils.rb', line 77
def cps_benchmark(label, &block)
seql do
let(:time) { lift { Time.now } }
and_then { block.call }
and_lift { puts "#{label} : #{(Time.now - time) * 1000} ms" }
end
end
|
#delete_sources_and_queues_cps(source_names, queue_names) ⇒ Object
47
48
49
50
51
52
53
54
55
|
# File 'lib/mercury/test_utils.rb', line 47
def delete_sources_and_queues_cps(source_names, queue_names)
Mercury::Monadic.open.and_then do |m|
Cps.inject(amq_filter(source_names)) { |s| m.delete_source(s) }.
inject(amq_filter(queue_names)) { |q| m.delete_work_queue(q) }.
and_then { m.close }
end
end
|
#done ⇒ Object
20
21
22
|
# File 'lib/mercury/test_utils.rb', line 20
def done
EM.stop
end
|
#em(timeout_seconds: 3) ⇒ Object
9
10
11
12
13
14
|
# File 'lib/mercury/test_utils.rb', line 9
def em(timeout_seconds: 3)
EM.run do
EM.add_timer(in_debug_mode? ? 999999 : timeout_seconds) { raise 'EM spec timed out' }
yield
end
end
|
#em_wait_until(pred, &k) ⇒ Object
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/mercury/test_utils.rb', line 24
def em_wait_until(pred, &k)
try_again = proc do
if pred.call
k.call
else
EM.add_timer(1.0 / 50, try_again)
end
end
try_again.call
end
|
#in_debug_mode? ⇒ Boolean
16
17
18
|
# File 'lib/mercury/test_utils.rb', line 16
def in_debug_mode?
ENV['RUBYLIB'] =~ /ruby-debug-ide/ end
|
#read_all_messages(worker:, source:, tag:, seconds_to_wait: 0.1) ⇒ Object
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/mercury/test_utils.rb', line 57
def read_all_messages(worker: , source:, tag:, seconds_to_wait: 0.1)
msgs = []
last_received_time = Time.now
msg_handler = ->(msg) do
msgs << msg
msg.ack
last_received_time = Time.now
end
EM.run do
Cps.seql do
let(:m) { Mercury::Monadic.open }
and_then { m.start_worker(worker, source, msg_handler, tag_filter: tag) }
and_then { wait_until { (Time.now - last_received_time).to_f > seconds_to_wait } }
and_then { m.close }
and_lift { EM.stop }
end.run
end
msgs
end
|
#wait_for(seconds) ⇒ Object
41
42
43
44
45
|
# File 'lib/mercury/test_utils.rb', line 41
def wait_for(seconds)
cps do |&k|
EM.add_timer(seconds, &k)
end
end
|
#wait_until(&pred) ⇒ Object
35
36
37
38
39
|
# File 'lib/mercury/test_utils.rb', line 35
def wait_until(&pred)
cps do |&k|
em_wait_until(pred, &k)
end
end
|