Class: Mercury::Fake
- Inherits:
-
Object
show all
- Defined in:
- lib/mercury/fake.rb,
lib/mercury/fake/queue.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/fake/queued_message.rb
Defined Under Namespace
Classes: Domain, Metadata, Queue, QueuedMessage, Subscriber
Class Method Summary
collapse
Instance Method Summary
collapse
-
#close(&k) ⇒ Object
-
#delete_source(source_name, &k) ⇒ Object
-
#delete_work_queue(worker_group, &k) ⇒ Object
-
#initialize(domain = :default, **kws) ⇒ Fake
constructor
-
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
-
#queue_exists?(worker, &k) ⇒ Boolean
-
#republish(msg, &k) ⇒ Object
-
#source_exists?(source, &k) ⇒ Boolean
-
#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
-
#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
Constructor Details
#initialize(domain = :default, **kws) ⇒ Fake
Returns a new instance of Fake.
28
29
30
31
32
33
34
35
|
# File 'lib/mercury/fake.rb', line 28
def initialize(domain=:default, **kws)
@domain = Fake.domains[domain]
@parallelism = kws.fetch(:parallelism, 1)
ignored_keys = kws.keys - [:parallelism]
if ignored_keys.any?
$stderr.puts "Warning: Mercury::Fake::new is ignoring keyword arguments: #{ignored_keys.join(', ')}"
end
end
|
Class Method Details
.domains ⇒ Object
37
38
39
|
# File 'lib/mercury/fake.rb', line 37
def self.domains
@domains ||= Hash.new { |h, k| h[k] = Domain.new }
end
|
.install(rspec_context, domain = :default) ⇒ Object
20
21
22
23
24
25
26
|
# File 'lib/mercury/fake.rb', line 20
def self.install(rspec_context, domain=:default)
rspec_context.instance_exec do
allow(Mercury).to receive(:open) do |**kws, &k|
EM.next_tick { k.call(Mercury::Fake.new(domain, **kws)) } end
end
end
|
Instance Method Details
#close(&k) ⇒ Object
41
42
43
44
|
# File 'lib/mercury/fake.rb', line 41
def close(&k)
@closed = true
ret(k)
end
|
#delete_source(source_name, &k) ⇒ Object
77
78
79
80
81
|
# File 'lib/mercury/fake.rb', line 77
def delete_source(source_name, &k)
guard_public(k)
queues.delete_if{|_k, v| v.source == source_name}
ret(k)
end
|
#delete_work_queue(worker_group, &k) ⇒ Object
83
84
85
86
87
|
# File 'lib/mercury/fake.rb', line 83
def delete_work_queue(worker_group, &k)
guard_public(k)
queues.delete_if{|_k, v| v.worker == worker_group}
ret(k)
end
|
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
46
47
48
49
50
|
# File 'lib/mercury/fake.rb', line 46
def publish(source_name, msg, tag: '', headers: {}, &k)
guard_public(k)
queues.values.select{|q| q.binds?(source_name, tag)}.each{|q| q.enqueue(roundtrip(msg), tag, .stringify_keys)}
ret(k)
end
|
#queue_exists?(worker, &k) ⇒ Boolean
95
96
97
98
|
# File 'lib/mercury/fake.rb', line 95
def queue_exists?(worker, &k)
guard_public(k)
ret(k, queues.values.map(&:worker).include?(worker))
end
|
#republish(msg, &k) ⇒ Object
52
53
54
55
56
57
58
|
# File 'lib/mercury/fake.rb', line 52
def republish(msg, &k)
guard_public(k)
msg.ack
queue = queues.values.detect{|q| q.worker == msg.work_queue_name}
queue.enqueue(roundtrip(msg.content), msg.tag, Mercury.increment_republish_count(msg))
ret(k)
end
|
#source_exists?(source, &k) ⇒ Boolean
89
90
91
92
93
|
# File 'lib/mercury/fake.rb', line 89
def source_exists?(source, &k)
guard_public(k)
built_in_sources = %w(direct topic fanout headers match rabbitmq.log rabbitmq.trace).map{|x| "amq.#{x}"}
ret(k, (queues.values.map(&:source) + built_in_sources).include?(source))
end
|
#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
60
61
62
|
# File 'lib/mercury/fake.rb', line 60
def start_listener(source_name, handler, tag_filter: nil, &k)
start_worker_or_listener(source_name, handler, tag_filter, &k)
end
|
#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
64
65
66
|
# File 'lib/mercury/fake.rb', line 64
def start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
start_worker_or_listener(source_name, handler, tag_filter, worker_group, &k)
end
|