Class: Mercury::Fake

Inherits:
Object
  • Object
show all
Defined in:
lib/mercury/fake.rb,
lib/mercury/fake/queue.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/fake/queued_message.rb

Defined Under Namespace

Classes: Domain, Metadata, Queue, QueuedMessage, Subscriber

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(domain = :default, **kws) ⇒ Fake

Returns a new instance of Fake.



28
29
30
31
32
33
34
35
# File 'lib/mercury/fake.rb', line 28

def initialize(domain=:default, **kws)
  @domain = Fake.domains[domain]
  @parallelism = kws.fetch(:parallelism, 1)
  ignored_keys = kws.keys - [:parallelism]
  if ignored_keys.any?
    $stderr.puts "Warning: Mercury::Fake::new is ignoring keyword arguments: #{ignored_keys.join(', ')}"
  end
end

Class Method Details

.domainsObject



37
38
39
# File 'lib/mercury/fake.rb', line 37

def self.domains
  @domains ||= Hash.new { |h, k| h[k] = Domain.new }
end

.install(rspec_context, domain = :default) ⇒ Object



20
21
22
23
24
25
26
# File 'lib/mercury/fake.rb', line 20

def self.install(rspec_context, domain=:default)
  rspec_context.instance_exec do
    allow(Mercury).to receive(:open) do |**kws, &k|
      EM.next_tick { k.call(Mercury::Fake.new(domain, **kws)) } # EM.next_tick is required to emulate the real Mercury.open
    end
  end
end

Instance Method Details

#close(&k) ⇒ Object



41
42
43
44
# File 'lib/mercury/fake.rb', line 41

def close(&k)
  @closed = true
  ret(k)
end

#delete_source(source_name, &k) ⇒ Object



77
78
79
80
81
# File 'lib/mercury/fake.rb', line 77

def delete_source(source_name, &k)
  guard_public(k)
  queues.delete_if{|_k, v| v.source == source_name}
  ret(k)
end

#delete_work_queue(worker_group, &k) ⇒ Object



83
84
85
86
87
# File 'lib/mercury/fake.rb', line 83

def delete_work_queue(worker_group, &k)
  guard_public(k)
  queues.delete_if{|_k, v| v.worker == worker_group}
  ret(k)
end

#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object



46
47
48
49
50
# File 'lib/mercury/fake.rb', line 46

def publish(source_name, msg, tag: '', headers: {}, &k)
  guard_public(k)
  queues.values.select{|q| q.binds?(source_name, tag)}.each{|q| q.enqueue(roundtrip(msg), tag, headers.stringify_keys)}
  ret(k)
end

#queue_exists?(worker, &k) ⇒ Boolean

Returns:

  • (Boolean)


95
96
97
98
# File 'lib/mercury/fake.rb', line 95

def queue_exists?(worker, &k)
  guard_public(k)
  ret(k, queues.values.map(&:worker).include?(worker))
end

#republish(msg, &k) ⇒ Object



52
53
54
55
56
57
58
# File 'lib/mercury/fake.rb', line 52

def republish(msg, &k)
  guard_public(k)
  msg.ack
  queue = queues.values.detect{|q| q.worker == msg.work_queue_name}
  queue.enqueue(roundtrip(msg.content), msg.tag, Mercury.increment_republish_count(msg))
  ret(k)
end

#source_exists?(source, &k) ⇒ Boolean

Returns:

  • (Boolean)


89
90
91
92
93
# File 'lib/mercury/fake.rb', line 89

def source_exists?(source, &k)
  guard_public(k)
  built_in_sources = %w(direct topic fanout headers match rabbitmq.log rabbitmq.trace).map{|x| "amq.#{x}"}
  ret(k, (queues.values.map(&:source) + built_in_sources).include?(source))
end

#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object



60
61
62
# File 'lib/mercury/fake.rb', line 60

def start_listener(source_name, handler, tag_filter: nil, &k)
  start_worker_or_listener(source_name, handler, tag_filter, &k)
end

#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object



64
65
66
# File 'lib/mercury/fake.rb', line 64

def start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
  start_worker_or_listener(source_name, handler, tag_filter, worker_group, &k)
end