Module: Mercury::TestUtils

Includes:
Cps::Methods
Defined in:
lib/mercury/test_utils.rb

Instance Method Summary collapse

Methods included from Cps::Methods

#cps, #lift, #seq, #seql, #seqp

Instance Method Details

#amq_filter(xs) ⇒ Object



85
86
87
# File 'lib/mercury/test_utils.rb', line 85

def amq_filter(xs)
  xs.reject{|x| x.start_with?('amq.')}
end

#cps_benchmark(label, &block) ⇒ Object



77
78
79
80
81
82
83
# File 'lib/mercury/test_utils.rb', line 77

def cps_benchmark(label, &block)
  seql do
    let(:time) { lift { Time.now } }
    and_then { block.call }
    and_lift { puts "#{label} : #{(Time.now - time) * 1000} ms" }
  end
end

#delete_sources_and_queues_cps(source_names, queue_names) ⇒ Object



47
48
49
50
51
52
53
54
55
# File 'lib/mercury/test_utils.rb', line 47

def delete_sources_and_queues_cps(source_names, queue_names)
  # We must create a new mercury. The AMQP gem doesn't let you redeclare
  # a construct with the same instance you deleted it with.
  Mercury::Monadic.open.and_then do |m|
    Cps.inject(amq_filter(source_names)) { |s| m.delete_source(s) }.
      inject(amq_filter(queue_names)) { |q| m.delete_work_queue(q) }.
      and_then { m.close }
  end
end

#doneObject



20
21
22
# File 'lib/mercury/test_utils.rb', line 20

def done
  EM.stop
end

#em(timeout_seconds: 3) ⇒ Object



9
10
11
12
13
14
# File 'lib/mercury/test_utils.rb', line 9

def em(timeout_seconds: 3)
  EM.run do
    EM.add_timer(in_debug_mode? ? 999999 : timeout_seconds) { raise 'EM spec timed out' }
    yield
  end
end

#em_wait_until(pred, &k) ⇒ Object



24
25
26
27
28
29
30
31
32
33
# File 'lib/mercury/test_utils.rb', line 24

def em_wait_until(pred, &k)
  try_again = proc do
    if pred.call
      k.call
    else
      EM.add_timer(1.0 / 50, try_again)
    end
  end
  try_again.call
end

#in_debug_mode?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/mercury/test_utils.rb', line 16

def in_debug_mode?
  ENV['RUBYLIB'] =~ /ruby-debug-ide/ # http://stackoverflow.com/questions/22039807/determine-if-a-program-is-running-in-debug-mode
end

#read_all_messages(worker:, source:, tag:, seconds_to_wait: 0.1) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/mercury/test_utils.rb', line 57

def read_all_messages(worker: , source:, tag:, seconds_to_wait: 0.1)
  msgs = []
  last_received_time = Time.now
  msg_handler = ->(msg) do
    msgs << msg
    msg.ack
    last_received_time = Time.now
  end
  EM.run do
    Cps.seql do
      let(:m) { Mercury::Monadic.open }
      and_then { m.start_worker(worker, source, msg_handler, tag_filter: tag) }
      and_then { wait_until { (Time.now - last_received_time).to_f > seconds_to_wait } }
      and_then { m.close }
      and_lift { EM.stop }
    end.run
  end
  msgs
end

#wait_for(seconds) ⇒ Object



41
42
43
44
45
# File 'lib/mercury/test_utils.rb', line 41

def wait_for(seconds)
  cps do |&k|
    EM.add_timer(seconds, &k)
  end
end

#wait_until(&pred) ⇒ Object



35
36
37
38
39
# File 'lib/mercury/test_utils.rb', line 35

def wait_until(&pred)
  cps do |&k|
    em_wait_until(pred, &k)
  end
end