Class: Mercury
- Inherits:
-
Object
- Object
- Mercury
- Defined in:
- lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb
Overview
This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.
Defined Under Namespace
Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer
Constant Summary collapse
- ORIGINAL_TAG_HEADER =
'Original-Tag'.freeze
- REPUBLISH_COUNT_HEADER =
'Republish-Count'.freeze
Instance Attribute Summary collapse
-
#amqp ⇒ Object
readonly
Returns the value of attribute amqp.
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Class Method Summary collapse
Instance Method Summary collapse
- #close(&k) ⇒ Object
- #delete_source(source_name, &k) ⇒ Object
- #delete_work_queue(worker_group, &k) ⇒ Object
-
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury
constructor
A new instance of Mercury.
- #publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
- #queue_exists?(queue_name, &k) ⇒ Boolean
-
#republish(msg, &k) ⇒ Object
Places a copy of the message at the back of the queue, then acks the original message.
- #source_exists?(source_name, &k) ⇒ Boolean
- #start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
- #start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
Constructor Details
#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury
Returns a new instance of Mercury.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/mercury/mercury.rb', line 30 def initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) guard_public(k, initializing: true) @logger = logger @on_error = on_error AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password, on_tcp_connection_failure: server_down_error_handler) do |amqp| @amqp = amqp install_lost_connection_error_handler AMQP::Channel.new(amqp, prefetch: parallelism) do |channel| @channel = channel install_channel_error_handler if wait_for_publisher_confirms enable_publisher_confirms do k.call(self) end else k.call(self) end end end end |
Instance Attribute Details
#amqp ⇒ Object (readonly)
Returns the value of attribute amqp.
12 13 14 |
# File 'lib/mercury/mercury.rb', line 12 def amqp @amqp end |
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
12 13 14 |
# File 'lib/mercury/mercury.rb', line 12 def channel @channel end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/mercury/mercury.rb', line 12 def logger @logger end |
Class Method Details
.open(logger: Logger.new(STDOUT), **kws, &k) ⇒ Object
14 15 16 17 |
# File 'lib/mercury/mercury.rb', line 14 def self.open(logger: Logger.new(STDOUT), **kws, &k) new(logger: logger, **kws, &k) nil end |
.publish_opts(tag, headers) ⇒ Object
97 98 99 |
# File 'lib/mercury/mercury.rb', line 97 def self.publish_opts(tag, headers) { routing_key: tag, persistent: true, headers: headers } end |
Instance Method Details
#close(&k) ⇒ Object
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/mercury/mercury.rb', line 19 def close(&k) if @amqp @amqp.close do @amqp = nil k.call end else EM.next_tick(&k) end end |
#delete_source(source_name, &k) ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/mercury/mercury.rb', line 125 def delete_source(source_name, &k) guard_public(k) with_source(source_name) do |exchange| exchange.delete do k.call end end end |
#delete_work_queue(worker_group, &k) ⇒ Object
134 135 136 137 138 139 140 141 |
# File 'lib/mercury/mercury.rb', line 134 def delete_work_queue(worker_group, &k) guard_public(k) @channel.queue(worker_group, work_queue_opts) do |queue| queue.delete do k.call end end end |
#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object
62 63 64 65 66 67 68 69 |
# File 'lib/mercury/mercury.rb', line 62 def publish(source_name, msg, tag: '', headers: {}, &k) guard_public(k) # The amqp gem caches exchange objects, so it's fine to # redeclare the exchange every time we publish. with_source(source_name) do |exchange| publish_internal(exchange, msg, tag, headers, &k) end end |
#queue_exists?(queue_name, &k) ⇒ Boolean
152 153 154 155 156 157 158 159 |
# File 'lib/mercury/mercury.rb', line 152 def queue_exists?(queue_name, &k) guard_public(k) existence_check(k) do |ch, &ret| ch.queue(queue_name, passive: true) do ret.call(true) end end end |
#republish(msg, &k) ⇒ Object
Places a copy of the message at the back of the queue, then acks the original message.
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/mercury/mercury.rb', line 73 def republish(msg, &k) guard_public(k) raise 'Only messages from a work queue can be republished' unless msg.work_queue_name raise 'A message can only be republished by the mercury instance that received it' unless msg.mercury_instance == self raise "This message was already #{msg.action_taken}ed" if msg.action_taken headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag) publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do msg.ack k.call end end |
#source_exists?(source_name, &k) ⇒ Boolean
143 144 145 146 147 148 149 150 |
# File 'lib/mercury/mercury.rb', line 143 def source_exists?(source_name, &k) guard_public(k) existence_check(k) do |ch, &ret| with_source_no_cache(ch, source_name, passive: true) do ret.call(true) end end end |
#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/mercury/mercury.rb', line 101 def start_listener(source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_listener_queue(exchange, tag_filter) do |queue| queue.subscribe(ack: false) do |, payload| handler.call((payload, )) end k.call end end end |
#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/mercury/mercury.rb', line 113 def start_worker(worker_group, source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_work_queue(worker_group, exchange, tag_filter) do |queue| queue.subscribe(ack: true) do |, payload| handler.call((payload, , work_queue_name: worker_group)) end k.call end end end |