Class: Mercury::Fake
- Inherits:
-
Object
show all
- Defined in:
- lib/mercury/fake.rb,
lib/mercury/fake/queue.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/fake/queued_message.rb
Defined Under Namespace
Classes: Domain, Metadata, Queue, QueuedMessage, Subscriber
Class Method Summary
collapse
Instance Method Summary
collapse
-
#close(&k) ⇒ Object
-
#delete_source(source_name, &k) ⇒ Object
-
#delete_work_queue(worker_group, &k) ⇒ Object
-
#initialize(domain = :default, parallelism: 1) ⇒ Fake
constructor
-
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
-
#queue_exists?(worker, &k) ⇒ Boolean
-
#source_exists?(source, &k) ⇒ Boolean
-
#start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object
-
#start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object
Constructor Details
#initialize(domain = :default, parallelism: 1) ⇒ Fake
Returns a new instance of Fake.
18
19
20
21
|
# File 'lib/mercury/fake.rb', line 18
def initialize(domain=:default, parallelism: 1)
@domain = Fake.domains[domain]
@parallelism = parallelism
end
|
Class Method Details
.domains ⇒ Object
23
24
25
|
# File 'lib/mercury/fake.rb', line 23
def self.domains
@domains ||= Hash.new { |h, k| h[k] = Domain.new }
end
|
Instance Method Details
#close(&k) ⇒ Object
27
28
29
30
|
# File 'lib/mercury/fake.rb', line 27
def close(&k)
@closed = true
ret(k)
end
|
#delete_source(source_name, &k) ⇒ Object
54
55
56
57
58
|
# File 'lib/mercury/fake.rb', line 54
def delete_source(source_name, &k)
assert_not_closed
queues.delete_if{|_k, v| v.source == source_name}
ret(k)
end
|
#delete_work_queue(worker_group, &k) ⇒ Object
60
61
62
63
64
|
# File 'lib/mercury/fake.rb', line 60
def delete_work_queue(worker_group, &k)
assert_not_closed
queues.delete_if{|_k, v| v.worker == worker_group}
ret(k)
end
|
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
32
33
34
35
36
|
# File 'lib/mercury/fake.rb', line 32
def publish(source_name, msg, tag: '', headers: {}, &k)
assert_not_closed
queues.values.select{|q| q.binds?(source_name, tag)}.each{|q| q.enqueue(roundtrip(msg), tag, )}
ret(k)
end
|
#queue_exists?(worker, &k) ⇒ Boolean
71
72
73
|
# File 'lib/mercury/fake.rb', line 71
def queue_exists?(worker, &k)
ret(k, queues.values.map(&:worker).include?(worker))
end
|
#source_exists?(source, &k) ⇒ Boolean
66
67
68
69
|
# File 'lib/mercury/fake.rb', line 66
def source_exists?(source, &k)
built_in_sources = %w(direct topic fanout headers match rabbitmq.log rabbitmq.trace).map{|x| "amq.#{x}"}
ret(k, (queues.values.map(&:source) + built_in_sources).include?(source))
end
|
#start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object
38
39
40
|
# File 'lib/mercury/fake.rb', line 38
def start_listener(source_name, handler, tag_filter: '#', &k)
start_worker_or_listener(source_name, handler, tag_filter, &k)
end
|
#start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object
42
43
44
|
# File 'lib/mercury/fake.rb', line 42
def start_worker(worker_group, source_name, handler, tag_filter: '#', &k)
start_worker_or_listener(source_name, handler, tag_filter, worker_group, &k)
end
|