Class: Mercury
- Inherits:
-
Object
- Object
- Mercury
- Defined in:
- lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb
Overview
This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.
Defined Under Namespace
Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer
Constant Summary collapse
- ORIGINAL_TAG_HEADER =
'Original-Tag'.freeze
- REPUBLISH_COUNT_HEADER =
'Republish-Count'.freeze
Instance Attribute Summary collapse
-
#amqp ⇒ Object
readonly
Returns the value of attribute amqp.
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Class Method Summary collapse
Instance Method Summary collapse
- #close(&k) ⇒ Object
- #delete_source(source_name, &k) ⇒ Object
- #delete_work_queue(worker_group, &k) ⇒ Object
-
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury
constructor
A new instance of Mercury.
- #publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
- #queue_exists?(queue_name, &k) ⇒ Boolean
-
#republish(msg, &k) ⇒ Object
Places a copy of the message at the back of the queue, then acks the original message.
- #source_exists?(source_name, &k) ⇒ Boolean
- #start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
- #start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
Constructor Details
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury
Returns a new instance of Mercury.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/mercury/mercury.rb', line 29 def initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) guard_public(k, initializing: true) @logger = logger @on_error = on_error AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password, on_tcp_connection_failure: server_down_error_handler) do |amqp| @amqp = amqp install_lost_connection_error_handler AMQP::Channel.new(amqp, prefetch: parallelism) do |channel| @channel = channel install_channel_error_handler if wait_for_publisher_confirms enable_publisher_confirms do k.call(self) end else k.call(self) end end end end |
Instance Attribute Details
#amqp ⇒ Object (readonly)
Returns the value of attribute amqp.
11 12 13 |
# File 'lib/mercury/mercury.rb', line 11 def amqp @amqp end |
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
11 12 13 |
# File 'lib/mercury/mercury.rb', line 11 def channel @channel end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
11 12 13 |
# File 'lib/mercury/mercury.rb', line 11 def logger @logger end |
Class Method Details
.open(logger: Logatron, **kws, &k) ⇒ Object
13 14 15 16 |
# File 'lib/mercury/mercury.rb', line 13 def self.open(logger: Logatron, **kws, &k) new(logger: logger, **kws, &k) nil end |
.publish_opts(tag, headers) ⇒ Object
96 97 98 |
# File 'lib/mercury/mercury.rb', line 96 def self.publish_opts(tag, headers) { routing_key: tag, persistent: true, headers: Logatron.http_headers.merge(headers) } end |
Instance Method Details
#close(&k) ⇒ Object
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/mercury/mercury.rb', line 18 def close(&k) if @amqp @amqp.close do @amqp = nil k.call end else EM.next_tick(&k) end end |
#delete_source(source_name, &k) ⇒ Object
124 125 126 127 128 129 130 131 |
# File 'lib/mercury/mercury.rb', line 124 def delete_source(source_name, &k) guard_public(k) with_source(source_name) do |exchange| exchange.delete do k.call end end end |
#delete_work_queue(worker_group, &k) ⇒ Object
133 134 135 136 137 138 139 140 |
# File 'lib/mercury/mercury.rb', line 133 def delete_work_queue(worker_group, &k) guard_public(k) @channel.queue(worker_group, work_queue_opts) do |queue| queue.delete do k.call end end end |
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/mercury/mercury.rb', line 61 def publish(source_name, msg, tag: '', headers: {}, &k) guard_public(k) # The amqp gem caches exchange objects, so it's fine to # redeclare the exchange every time we publish. with_source(source_name) do |exchange| publish_internal(exchange, msg, tag, headers, &k) end end |
#queue_exists?(queue_name, &k) ⇒ Boolean
151 152 153 154 155 156 157 158 |
# File 'lib/mercury/mercury.rb', line 151 def queue_exists?(queue_name, &k) guard_public(k) existence_check(k) do |ch, &ret| ch.queue(queue_name, passive: true) do ret.call(true) end end end |
#republish(msg, &k) ⇒ Object
Places a copy of the message at the back of the queue, then acks the original message.
72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/mercury/mercury.rb', line 72 def republish(msg, &k) guard_public(k) raise 'Only messages from a work queue can be republished' unless msg.work_queue_name raise 'A message can only be republished by the mercury instance that received it' unless msg.mercury_instance == self raise "This message was already #{msg.action_taken}ed" if msg.action_taken headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag) publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do msg.ack k.call end end |
#source_exists?(source_name, &k) ⇒ Boolean
142 143 144 145 146 147 148 149 |
# File 'lib/mercury/mercury.rb', line 142 def source_exists?(source_name, &k) guard_public(k) existence_check(k) do |ch, &ret| with_source_no_cache(ch, source_name, passive: true) do ret.call(true) end end end |
#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/mercury/mercury.rb', line 100 def start_listener(source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_listener_queue(exchange, tag_filter) do |queue| queue.subscribe(ack: false) do |, payload| handler.call((payload, )) end k.call end end end |
#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/mercury/mercury.rb', line 112 def start_worker(worker_group, source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_work_queue(worker_group, exchange, tag_filter) do |queue| queue.subscribe(ack: true) do |, payload| handler.call((payload, , work_queue_name: worker_group)) end k.call end end end |