Class: Mercury

Inherits:
Object
  • Object
show all
Defined in:
lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/version.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb

Overview

This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.

Defined Under Namespace

Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer

Constant Summary collapse

ORIGINAL_TAG_HEADER =
'Original-Tag'.freeze
REPUBLISH_COUNT_HEADER =
'Republish-Count'.freeze
VERSION =
'0.5.0'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury

Returns a new instance of Mercury.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/mercury/mercury.rb', line 29

def initialize(host: 'localhost',
               port: 5672,
               vhost: '/',
               username: 'guest',
               password: 'guest',
               parallelism: 1,
               on_error: nil,
               wait_for_publisher_confirms: true,
               logger:,
               &k)
  guard_public(k, initializing: true)
  @logger = logger
  @on_error = on_error
  AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password,
               on_tcp_connection_failure: server_down_error_handler) do |amqp|
    @amqp = amqp
    install_lost_connection_error_handler
    AMQP::Channel.new(amqp, prefetch: parallelism) do |channel|
      @channel = channel
      install_channel_error_handler
      if wait_for_publisher_confirms
        enable_publisher_confirms do
          k.call(self)
        end
      else
        k.call(self)
      end
    end
  end
end

Instance Attribute Details

#amqpObject (readonly)

Returns the value of attribute amqp.



11
12
13
# File 'lib/mercury/mercury.rb', line 11

def amqp
  @amqp
end

#channelObject (readonly)

Returns the value of attribute channel.



11
12
13
# File 'lib/mercury/mercury.rb', line 11

def channel
  @channel
end

#loggerObject (readonly)

Returns the value of attribute logger.



11
12
13
# File 'lib/mercury/mercury.rb', line 11

def logger
  @logger
end

Class Method Details

.guard_public(is_closed, k, initializing: false) ⇒ Object



330
331
332
333
334
335
336
337
# File 'lib/mercury/mercury.rb', line 330

def self.guard_public(is_closed, k, initializing: false)
  if is_closed && !initializing
    raise 'This mercury instance is defunct. Either it was purposely closed or an error occurred.'
  end
  unless k
    raise 'A continuation block is required but none was provided.'
  end
end

.increment_republish_count(msg) ⇒ Hash

Returns the headers with republish count incremented.

Parameters:

Returns:

  • (Hash)

    the headers with republish count incremented



322
323
324
# File 'lib/mercury/mercury.rb', line 322

def self.increment_republish_count(msg)
  msg.headers.merge(REPUBLISH_COUNT_HEADER => msg.republish_count + 1)
end

.open(logger: Logatron, **kws, &k) ⇒ Object



13
14
15
16
# File 'lib/mercury/mercury.rb', line 13

def self.open(logger: Logatron, **kws, &k)
  new(logger: logger, **kws, &k)
  nil
end

.publish_opts(tag, headers) ⇒ Object



94
95
96
# File 'lib/mercury/mercury.rb', line 94

def self.publish_opts(tag, headers)
  { routing_key: tag, persistent: true, headers: Logatron.http_headers.merge(headers) }
end

.source_optsObject



300
301
302
# File 'lib/mercury/mercury.rb', line 300

def self.source_opts
  { durable: true, auto_delete: false }
end

Instance Method Details

#close(&k) ⇒ Object



18
19
20
21
22
23
24
25
26
27
# File 'lib/mercury/mercury.rb', line 18

def close(&k)
  if @amqp
    @amqp.close do
      @amqp = nil
      k.call
    end
  else
    EM.next_tick(&k)
  end
end

#delete_source(source_name, &k) ⇒ Object



122
123
124
125
126
127
128
129
# File 'lib/mercury/mercury.rb', line 122

def delete_source(source_name, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    exchange.delete do
      k.call
    end
  end
end

#delete_work_queue(worker_group, &k) ⇒ Object



131
132
133
134
135
136
137
138
# File 'lib/mercury/mercury.rb', line 131

def delete_work_queue(worker_group, &k)
  guard_public(k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    queue.delete do
      k.call
    end
  end
end

#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object



61
62
63
64
65
66
67
68
# File 'lib/mercury/mercury.rb', line 61

def publish(source_name, msg, tag: '', headers: {}, &k)
  guard_public(k)
  # The amqp gem caches exchange objects, so it's fine to
  # redeclare the exchange every time we publish.
  with_source(source_name) do |exchange|
    publish_internal(exchange, msg, tag, headers, &k)
  end
end

#queue_exists?(queue_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


149
150
151
152
153
154
155
156
# File 'lib/mercury/mercury.rb', line 149

def queue_exists?(queue_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    ch.queue(queue_name, passive: true) do
      ret.call(true)
    end
  end
end

#republish(msg, &k) ⇒ Object

Places a copy of the message at the back of the queue, then acks the original message.



72
73
74
75
76
77
78
79
80
# File 'lib/mercury/mercury.rb', line 72

def republish(msg, &k)
  guard_public(k)
  raise 'Only messages from a work queue can be republished' unless msg.work_queue_name
  headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag)
  publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do
    msg.ack
    k.call
  end
end

#source_exists?(source_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


140
141
142
143
144
145
146
147
# File 'lib/mercury/mercury.rb', line 140

def source_exists?(source_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    with_source_no_cache(ch, source_name, passive: true) do
      ret.call(true)
    end
  end
end

#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/mercury/mercury.rb', line 98

def start_listener(source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_listener_queue(exchange, tag_filter) do |queue|
      queue.subscribe(ack: false) do |, payload|
        handler.call(make_received_message(payload, ))
      end
      k.call
    end
  end
end

#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
# File 'lib/mercury/mercury.rb', line 110

def start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_work_queue(worker_group, exchange, tag_filter) do |queue|
      queue.subscribe(ack: true) do |, payload|
        handler.call(make_received_message(payload, , work_queue_name: worker_group))
      end
      k.call
    end
  end
end