Class: Mercury
- Inherits:
-
Object
show all
- Defined in:
- lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/version.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb
Overview
This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.
Defined Under Namespace
Modules: TestUtils
Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer
Constant Summary
collapse
- VERSION =
'0.1.9'
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
-
#close(&k) ⇒ Object
-
#delete_source(source_name, &k) ⇒ Object
-
#delete_work_queue(worker_group, &k) ⇒ Object
-
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, &k) ⇒ Mercury
constructor
A new instance of Mercury.
-
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
-
#queue_exists?(queue_name, &k) ⇒ Boolean
-
#source_exists?(source_name, &k) ⇒ Boolean
-
#start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object
-
#start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object
Constructor Details
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, &k) ⇒ Mercury
Returns a new instance of Mercury.
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/mercury/mercury.rb', line 22
def initialize(host: 'localhost',
port: 5672,
vhost: '/',
username: 'guest',
password: 'guest',
parallelism: 1,
on_error: nil,
&k)
@on_error = on_error
AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password,
on_tcp_connection_failure: server_down_error_handler) do |amqp|
@amqp = amqp
@channel = AMQP::Channel.new(amqp, prefetch: parallelism) do
@channel.confirm_select
install_channel_error_handler
install_lost_connection_error_handler
k.call(self)
end
end
end
|
Instance Attribute Details
#amqp ⇒ Object
Returns the value of attribute amqp.
8
9
10
|
# File 'lib/mercury/mercury.rb', line 8
def amqp
@amqp
end
|
#channel ⇒ Object
Returns the value of attribute channel.
8
9
10
|
# File 'lib/mercury/mercury.rb', line 8
def channel
@channel
end
|
#logger ⇒ Object
Returns the value of attribute logger.
8
9
10
|
# File 'lib/mercury/mercury.rb', line 8
def logger
@logger
end
|
Class Method Details
.open(logger: Logatron, **kws, &k) ⇒ Object
10
11
12
13
14
|
# File 'lib/mercury/mercury.rb', line 10
def self.open(logger: Logatron, **kws, &k)
@logger = logger
new(**kws, &k)
nil
end
|
.publish_opts(tag, headers) ⇒ Object
55
56
57
|
# File 'lib/mercury/mercury.rb', line 55
def self.publish_opts(tag, )
{ routing_key: tag, persistent: true, headers: Logatron..merge() }
end
|
Instance Method Details
#close(&k) ⇒ Object
16
17
18
19
20
|
# File 'lib/mercury/mercury.rb', line 16
def close(&k)
@amqp.close do
k.call
end
end
|
#delete_source(source_name, &k) ⇒ Object
81
82
83
84
85
86
87
|
# File 'lib/mercury/mercury.rb', line 81
def delete_source(source_name, &k)
with_source(source_name) do |exchange|
exchange.delete do
k.call
end
end
end
|
#delete_work_queue(worker_group, &k) ⇒ Object
89
90
91
92
93
94
95
|
# File 'lib/mercury/mercury.rb', line 89
def delete_work_queue(worker_group, &k)
@channel.queue(worker_group, work_queue_opts) do |queue|
queue.delete do
k.call
end
end
end
|
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/mercury/mercury.rb', line 44
def publish(source_name, msg, tag: '', headers: {}, &k)
with_source(source_name) do |exchange|
exchange.publish(write(msg), **Mercury.publish_opts(tag, )) do
k.call
end
end
end
|
#queue_exists?(queue_name, &k) ⇒ Boolean
105
106
107
108
109
110
111
|
# File 'lib/mercury/mercury.rb', line 105
def queue_exists?(queue_name, &k)
existence_check(k) do |ch, &ret|
ch.queue(queue_name, passive: true) do
ret.call(true)
end
end
end
|
#source_exists?(source_name, &k) ⇒ Boolean
97
98
99
100
101
102
103
|
# File 'lib/mercury/mercury.rb', line 97
def source_exists?(source_name, &k)
existence_check(k) do |ch, &ret|
with_source_no_cache(ch, source_name, passive: true) do
ret.call(true)
end
end
end
|
#start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/mercury/mercury.rb', line 59
def start_listener(source_name, handler, tag_filter: '#', &k)
with_source(source_name) do |exchange|
with_listener_queue(exchange, tag_filter) do |queue|
queue.subscribe(ack: false) do |metadata, payload|
handler.call(make_received_message(payload, metadata, false))
end
k.call
end
end
end
|
#start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/mercury/mercury.rb', line 70
def start_worker(worker_group, source_name, handler, tag_filter: '#', &k)
with_source(source_name) do |exchange|
with_work_queue(worker_group, exchange, tag_filter) do |queue|
queue.subscribe(ack: true) do |metadata, payload|
handler.call(make_received_message(payload, metadata, true))
end
k.call
end
end
end
|