Class: Mercury
- Inherits:
-
Object
- Object
- Mercury
- Defined in:
- lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb
Overview
This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.
Defined Under Namespace
Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer
Constant Summary collapse
- ORIGINAL_TAG_HEADER =
'Original-Tag'.freeze
- REPUBLISH_COUNT_HEADER =
'Republish-Count'.freeze
Instance Attribute Summary collapse
-
#amqp ⇒ Object
readonly
Returns the value of attribute amqp.
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Class Method Summary collapse
Instance Method Summary collapse
- #close(&k) ⇒ Object
- #delete_source(source_name, &k) ⇒ Object
- #delete_work_queue(worker_group, &k) ⇒ Object
-
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury
constructor
A new instance of Mercury.
- #publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
- #queue_exists?(queue_name, &k) ⇒ Boolean
-
#republish(msg, &k) ⇒ Object
Places a copy of the message at the back of the queue, then acks the original message.
- #source_exists?(source_name, &k) ⇒ Boolean
- #start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
-
#start_queue_worker(worker_group, handler, &k) ⇒ Object
Production code should not call this.
- #start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
Constructor Details
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury
Returns a new instance of Mercury.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/mercury/mercury.rb', line 30 def initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) guard_public(k, initializing: true) @logger = logger @on_error = on_error AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password, on_tcp_connection_failure: server_down_error_handler) do |amqp| @amqp = amqp install_lost_connection_error_handler AMQP::Channel.new(amqp, prefetch: parallelism) do |channel| @channel = channel install_channel_error_handler if wait_for_publisher_confirms enable_publisher_confirms do k.call(self) end else k.call(self) end end end end |
Instance Attribute Details
#amqp ⇒ Object (readonly)
Returns the value of attribute amqp.
12 13 14 |
# File 'lib/mercury/mercury.rb', line 12 def amqp @amqp end |
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
12 13 14 |
# File 'lib/mercury/mercury.rb', line 12 def channel @channel end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/mercury/mercury.rb', line 12 def logger @logger end |
Class Method Details
.open(logger: Logger.new(STDOUT), **kws, &k) ⇒ Object
14 15 16 17 |
# File 'lib/mercury/mercury.rb', line 14 def self.open(logger: Logger.new(STDOUT), **kws, &k) new(logger: logger, **kws, &k) nil end |
.publish_opts(tag, headers) ⇒ Object
97 98 99 |
# File 'lib/mercury/mercury.rb', line 97 def self.publish_opts(tag, headers) { routing_key: tag, persistent: true, headers: headers } end |
Instance Method Details
#close(&k) ⇒ Object
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/mercury/mercury.rb', line 19 def close(&k) if @amqp @amqp.close do @amqp = nil k.call end else EM.next_tick(&k) end end |
#delete_source(source_name, &k) ⇒ Object
140 141 142 143 144 145 146 147 |
# File 'lib/mercury/mercury.rb', line 140 def delete_source(source_name, &k) guard_public(k) with_source(source_name) do |exchange| exchange.delete do k.call end end end |
#delete_work_queue(worker_group, &k) ⇒ Object
149 150 151 152 153 154 155 156 |
# File 'lib/mercury/mercury.rb', line 149 def delete_work_queue(worker_group, &k) guard_public(k) @channel.queue(worker_group, work_queue_opts) do |queue| queue.delete do k.call end end end |
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
62 63 64 65 66 67 68 69 |
# File 'lib/mercury/mercury.rb', line 62 def publish(source_name, msg, tag: '', headers: {}, &k) guard_public(k) # The amqp gem caches exchange objects, so it's fine to # redeclare the exchange every time we publish. with_source(source_name) do |exchange| publish_internal(exchange, msg, tag, headers, &k) end end |
#queue_exists?(queue_name, &k) ⇒ Boolean
167 168 169 170 171 172 173 174 |
# File 'lib/mercury/mercury.rb', line 167 def queue_exists?(queue_name, &k) guard_public(k) existence_check(k) do |ch, &ret| ch.queue(queue_name, passive: true) do ret.call(true) end end end |
#republish(msg, &k) ⇒ Object
Places a copy of the message at the back of the queue, then acks the original message.
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/mercury/mercury.rb', line 73 def republish(msg, &k) guard_public(k) raise 'Only messages from a work queue can be republished' unless msg.work_queue_name raise 'A message can only be republished by the mercury instance that received it' unless msg.mercury_instance == self raise "This message was already #{msg.action_taken}ed" if msg.action_taken headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag) publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do msg.ack k.call end end |
#source_exists?(source_name, &k) ⇒ Boolean
158 159 160 161 162 163 164 165 |
# File 'lib/mercury/mercury.rb', line 158 def source_exists?(source_name, &k) guard_public(k) existence_check(k) do |ch, &ret| with_source_no_cache(ch, source_name, passive: true) do ret.call(true) end end end |
#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/mercury/mercury.rb', line 101 def start_listener(source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_listener_queue(exchange, tag_filter) do |queue| queue.subscribe(ack: false) do |, payload| handler.call((payload, )) end k.call end end end |
#start_queue_worker(worker_group, handler, &k) ⇒ Object
Production code should not call this. Only test/tool code should call this, and only if you’re sure the worker queue already exists and is bound to the source
126 127 128 129 130 131 132 |
# File 'lib/mercury/mercury.rb', line 126 def start_queue_worker(worker_group, handler, &k) guard_public(k) @channel.queue(worker_group, work_queue_opts) do |queue| subscribe_worker(queue, handler) k.call end end |
#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/mercury/mercury.rb', line 113 def start_worker(worker_group, source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_work_queue(worker_group, exchange, tag_filter) do |queue| subscribe_worker(queue, handler) k.call end end end |