Class: Mercury
- Inherits:
-
Object
- Object
- Mercury
- Defined in:
- lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/version.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb
Overview
This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.
Defined Under Namespace
Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer
Constant Summary collapse
- VERSION =
'0.2.0'
Instance Attribute Summary collapse
-
#amqp ⇒ Object
readonly
Returns the value of attribute amqp.
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Class Method Summary collapse
- .open(logger: Logatron, **kws, &k) ⇒ Object
- .publish_opts(tag, headers) ⇒ Object
- .source_opts ⇒ Object
Instance Method Summary collapse
- #close(&k) ⇒ Object
- #delete_source(source_name, &k) ⇒ Object
- #delete_work_queue(worker_group, &k) ⇒ Object
-
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, &k) ⇒ Mercury
constructor
A new instance of Mercury.
- #publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
- #queue_exists?(queue_name, &k) ⇒ Boolean
- #source_exists?(source_name, &k) ⇒ Boolean
- #start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object
- #start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object
Constructor Details
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, &k) ⇒ Mercury
Returns a new instance of Mercury.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/mercury/mercury.rb', line 22 def initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, &k) @on_error = on_error AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password, on_tcp_connection_failure: server_down_error_handler) do |amqp| @amqp = amqp @channel = AMQP::Channel.new(amqp, prefetch: parallelism) do install_channel_error_handler install_lost_connection_error_handler if wait_for_publisher_confirms enable_publisher_confirms do k.call(self) end else k.call(self) end end end end |
Instance Attribute Details
#amqp ⇒ Object (readonly)
Returns the value of attribute amqp.
8 9 10 |
# File 'lib/mercury/mercury.rb', line 8 def amqp @amqp end |
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
8 9 10 |
# File 'lib/mercury/mercury.rb', line 8 def channel @channel end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/mercury/mercury.rb', line 8 def logger @logger end |
Class Method Details
.open(logger: Logatron, **kws, &k) ⇒ Object
10 11 12 13 14 |
# File 'lib/mercury/mercury.rb', line 10 def self.open(logger: Logatron, **kws, &k) @logger = logger new(**kws, &k) nil end |
.publish_opts(tag, headers) ⇒ Object
65 66 67 |
# File 'lib/mercury/mercury.rb', line 65 def self.publish_opts(tag, headers) { routing_key: tag, persistent: true, headers: Logatron.http_headers.merge(headers) } end |
.source_opts ⇒ Object
263 264 265 |
# File 'lib/mercury/mercury.rb', line 263 def self.source_opts { durable: true, auto_delete: false } end |
Instance Method Details
#close(&k) ⇒ Object
16 17 18 19 20 |
# File 'lib/mercury/mercury.rb', line 16 def close(&k) @amqp.close do k.call end end |
#delete_source(source_name, &k) ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/mercury/mercury.rb', line 91 def delete_source(source_name, &k) with_source(source_name) do |exchange| exchange.delete do k.call end end end |
#delete_work_queue(worker_group, &k) ⇒ Object
99 100 101 102 103 104 105 |
# File 'lib/mercury/mercury.rb', line 99 def delete_work_queue(worker_group, &k) @channel.queue(worker_group, work_queue_opts) do |queue| queue.delete do k.call end end end |
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/mercury/mercury.rb', line 50 def publish(source_name, msg, tag: '', headers: {}, &k) # The amqp gem caches exchange objects, so it's fine to # redeclare the exchange every time we publish. with_source(source_name) do |exchange| payload = write(msg) pub_opts = Mercury.publish_opts(tag, headers) if publisher_confirms_enabled expect_publisher_confirm(k) exchange.publish(payload, **pub_opts) else exchange.publish(payload, **pub_opts, &k) end end end |
#queue_exists?(queue_name, &k) ⇒ Boolean
115 116 117 118 119 120 121 |
# File 'lib/mercury/mercury.rb', line 115 def queue_exists?(queue_name, &k) existence_check(k) do |ch, &ret| ch.queue(queue_name, passive: true) do ret.call(true) end end end |
#source_exists?(source_name, &k) ⇒ Boolean
107 108 109 110 111 112 113 |
# File 'lib/mercury/mercury.rb', line 107 def source_exists?(source_name, &k) existence_check(k) do |ch, &ret| with_source_no_cache(ch, source_name, passive: true) do ret.call(true) end end end |
#start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/mercury/mercury.rb', line 69 def start_listener(source_name, handler, tag_filter: '#', &k) with_source(source_name) do |exchange| with_listener_queue(exchange, tag_filter) do |queue| queue.subscribe(ack: false) do |, payload| handler.call((payload, , false)) end k.call end end end |
#start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/mercury/mercury.rb', line 80 def start_worker(worker_group, source_name, handler, tag_filter: '#', &k) with_source(source_name) do |exchange| with_work_queue(worker_group, exchange, tag_filter) do |queue| queue.subscribe(ack: true) do |, payload| handler.call((payload, , true)) end k.call end end end |