Class: Mercury
- Inherits:
-
Object
- Object
- Mercury
- Defined in:
- lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/version.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb
Overview
This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.
Defined Under Namespace
Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer
Constant Summary collapse
- ORIGINAL_TAG_HEADER =
'Original-Tag'.freeze
- REPUBLISH_COUNT_HEADER =
'Republish-Count'.freeze
- VERSION =
'0.5.0'
Instance Attribute Summary collapse
-
#amqp ⇒ Object
readonly
Returns the value of attribute amqp.
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Class Method Summary collapse
- .guard_public(is_closed, k, initializing: false) ⇒ Object
-
.increment_republish_count(msg) ⇒ Hash
The headers with republish count incremented.
- .open(logger: Logatron, **kws, &k) ⇒ Object
- .publish_opts(tag, headers) ⇒ Object
- .source_opts ⇒ Object
Instance Method Summary collapse
- #close(&k) ⇒ Object
- #delete_source(source_name, &k) ⇒ Object
- #delete_work_queue(worker_group, &k) ⇒ Object
-
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury
constructor
A new instance of Mercury.
- #publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
- #queue_exists?(queue_name, &k) ⇒ Boolean
-
#republish(msg, &k) ⇒ Object
Places a copy of the message at the back of the queue, then acks the original message.
- #source_exists?(source_name, &k) ⇒ Boolean
- #start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
- #start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
Constructor Details
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury
Returns a new instance of Mercury.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/mercury/mercury.rb', line 29 def initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) guard_public(k, initializing: true) @logger = logger @on_error = on_error AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password, on_tcp_connection_failure: server_down_error_handler) do |amqp| @amqp = amqp install_lost_connection_error_handler AMQP::Channel.new(amqp, prefetch: parallelism) do |channel| @channel = channel install_channel_error_handler if wait_for_publisher_confirms enable_publisher_confirms do k.call(self) end else k.call(self) end end end end |
Instance Attribute Details
#amqp ⇒ Object (readonly)
Returns the value of attribute amqp.
11 12 13 |
# File 'lib/mercury/mercury.rb', line 11 def amqp @amqp end |
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
11 12 13 |
# File 'lib/mercury/mercury.rb', line 11 def channel @channel end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
11 12 13 |
# File 'lib/mercury/mercury.rb', line 11 def logger @logger end |
Class Method Details
.guard_public(is_closed, k, initializing: false) ⇒ Object
330 331 332 333 334 335 336 337 |
# File 'lib/mercury/mercury.rb', line 330 def self.guard_public(is_closed, k, initializing: false) if is_closed && !initializing raise 'This mercury instance is defunct. Either it was purposely closed or an error occurred.' end unless k raise 'A continuation block is required but none was provided.' end end |
.increment_republish_count(msg) ⇒ Hash
Returns the headers with republish count incremented.
322 323 324 |
# File 'lib/mercury/mercury.rb', line 322 def self.increment_republish_count(msg) msg.headers.merge(REPUBLISH_COUNT_HEADER => msg.republish_count + 1) end |
.open(logger: Logatron, **kws, &k) ⇒ Object
13 14 15 16 |
# File 'lib/mercury/mercury.rb', line 13 def self.open(logger: Logatron, **kws, &k) new(logger: logger, **kws, &k) nil end |
.publish_opts(tag, headers) ⇒ Object
94 95 96 |
# File 'lib/mercury/mercury.rb', line 94 def self.publish_opts(tag, headers) { routing_key: tag, persistent: true, headers: Logatron.http_headers.merge(headers) } end |
.source_opts ⇒ Object
300 301 302 |
# File 'lib/mercury/mercury.rb', line 300 def self.source_opts { durable: true, auto_delete: false } end |
Instance Method Details
#close(&k) ⇒ Object
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/mercury/mercury.rb', line 18 def close(&k) if @amqp @amqp.close do @amqp = nil k.call end else EM.next_tick(&k) end end |
#delete_source(source_name, &k) ⇒ Object
122 123 124 125 126 127 128 129 |
# File 'lib/mercury/mercury.rb', line 122 def delete_source(source_name, &k) guard_public(k) with_source(source_name) do |exchange| exchange.delete do k.call end end end |
#delete_work_queue(worker_group, &k) ⇒ Object
131 132 133 134 135 136 137 138 |
# File 'lib/mercury/mercury.rb', line 131 def delete_work_queue(worker_group, &k) guard_public(k) @channel.queue(worker_group, work_queue_opts) do |queue| queue.delete do k.call end end end |
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/mercury/mercury.rb', line 61 def publish(source_name, msg, tag: '', headers: {}, &k) guard_public(k) # The amqp gem caches exchange objects, so it's fine to # redeclare the exchange every time we publish. with_source(source_name) do |exchange| publish_internal(exchange, msg, tag, headers, &k) end end |
#queue_exists?(queue_name, &k) ⇒ Boolean
149 150 151 152 153 154 155 156 |
# File 'lib/mercury/mercury.rb', line 149 def queue_exists?(queue_name, &k) guard_public(k) existence_check(k) do |ch, &ret| ch.queue(queue_name, passive: true) do ret.call(true) end end end |
#republish(msg, &k) ⇒ Object
Places a copy of the message at the back of the queue, then acks the original message.
72 73 74 75 76 77 78 79 80 |
# File 'lib/mercury/mercury.rb', line 72 def republish(msg, &k) guard_public(k) raise 'Only messages from a work queue can be republished' unless msg.work_queue_name headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag) publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do msg.ack k.call end end |
#source_exists?(source_name, &k) ⇒ Boolean
140 141 142 143 144 145 146 147 |
# File 'lib/mercury/mercury.rb', line 140 def source_exists?(source_name, &k) guard_public(k) existence_check(k) do |ch, &ret| with_source_no_cache(ch, source_name, passive: true) do ret.call(true) end end end |
#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/mercury/mercury.rb', line 98 def start_listener(source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_listener_queue(exchange, tag_filter) do |queue| queue.subscribe(ack: false) do |, payload| handler.call((payload, )) end k.call end end end |
#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/mercury/mercury.rb', line 110 def start_worker(worker_group, source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_work_queue(worker_group, exchange, tag_filter) do |queue| queue.subscribe(ack: true) do |, payload| handler.call((payload, , work_queue_name: worker_group)) end k.call end end end |