Class: Mercury

Inherits:
Object
  • Object
show all
Defined in:
lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb

Overview

This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.

Defined Under Namespace

Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer

Constant Summary collapse

ORIGINAL_TAG_HEADER =
'Original-Tag'.freeze
REPUBLISH_COUNT_HEADER =
'Republish-Count'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury

Returns a new instance of Mercury.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/mercury/mercury.rb', line 30

def initialize(host: 'localhost',
               port: 5672,
               vhost: '/',
               username: 'guest',
               password: 'guest',
               parallelism: 1,
               on_error: nil,
               wait_for_publisher_confirms: true,
               logger:,
               &k)
  guard_public(k, initializing: true)
  @logger = logger
  @on_error = on_error
  AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password,
               on_tcp_connection_failure: server_down_error_handler) do |amqp|
    @amqp = amqp
    install_lost_connection_error_handler
    AMQP::Channel.new(amqp, prefetch: parallelism) do |channel|
      @channel = channel
      install_channel_error_handler
      if wait_for_publisher_confirms
        enable_publisher_confirms do
          k.call(self)
        end
      else
        k.call(self)
      end
    end
  end
end

Instance Attribute Details

#amqpObject (readonly)

Returns the value of attribute amqp.



12
13
14
# File 'lib/mercury/mercury.rb', line 12

def amqp
  @amqp
end

#channelObject (readonly)

Returns the value of attribute channel.



12
13
14
# File 'lib/mercury/mercury.rb', line 12

def channel
  @channel
end

#loggerObject (readonly)

Returns the value of attribute logger.



12
13
14
# File 'lib/mercury/mercury.rb', line 12

def logger
  @logger
end

Class Method Details

.open(logger: Logger.new(STDOUT), **kws, &k) ⇒ Object



14
15
16
17
# File 'lib/mercury/mercury.rb', line 14

def self.open(logger: Logger.new(STDOUT), **kws, &k)
  new(logger: logger, **kws, &k)
  nil
end

.publish_opts(tag, headers) ⇒ Object



97
98
99
# File 'lib/mercury/mercury.rb', line 97

def self.publish_opts(tag, headers)
  { routing_key: tag, persistent: true, headers: headers }
end

Instance Method Details

#close(&k) ⇒ Object



19
20
21
22
23
24
25
26
27
28
# File 'lib/mercury/mercury.rb', line 19

def close(&k)
  if @amqp
    @amqp.close do
      @amqp = nil
      k.call
    end
  else
    EM.next_tick(&k)
  end
end

#delete_source(source_name, &k) ⇒ Object



140
141
142
143
144
145
146
147
# File 'lib/mercury/mercury.rb', line 140

def delete_source(source_name, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    exchange.delete do
      k.call
    end
  end
end

#delete_work_queue(worker_group, &k) ⇒ Object



149
150
151
152
153
154
155
156
# File 'lib/mercury/mercury.rb', line 149

def delete_work_queue(worker_group, &k)
  guard_public(k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    queue.delete do
      k.call
    end
  end
end

#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object



62
63
64
65
66
67
68
69
# File 'lib/mercury/mercury.rb', line 62

def publish(source_name, msg, tag: '', headers: {}, &k)
  guard_public(k)
  # The amqp gem caches exchange objects, so it's fine to
  # redeclare the exchange every time we publish.
  with_source(source_name) do |exchange|
    publish_internal(exchange, msg, tag, headers, &k)
  end
end

#queue_exists?(queue_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


167
168
169
170
171
172
173
174
# File 'lib/mercury/mercury.rb', line 167

def queue_exists?(queue_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    ch.queue(queue_name, passive: true) do
      ret.call(true)
    end
  end
end

#republish(msg, &k) ⇒ Object

Places a copy of the message at the back of the queue, then acks the original message.



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/mercury/mercury.rb', line 73

def republish(msg, &k)
  guard_public(k)
  raise 'Only messages from a work queue can be republished' unless msg.work_queue_name
  raise 'A message can only be republished by the mercury instance that received it' unless msg.mercury_instance == self
  raise "This message was already #{msg.action_taken}ed" if msg.action_taken
  headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag)
  publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do
    msg.ack
    k.call
  end
end

#source_exists?(source_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


158
159
160
161
162
163
164
165
# File 'lib/mercury/mercury.rb', line 158

def source_exists?(source_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    with_source_no_cache(ch, source_name, passive: true) do
      ret.call(true)
    end
  end
end

#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/mercury/mercury.rb', line 101

def start_listener(source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_listener_queue(exchange, tag_filter) do |queue|
      queue.subscribe(ack: false) do |, payload|
        handler.call(make_received_message(payload, ))
      end
      k.call
    end
  end
end

#start_queue_worker(worker_group, handler, &k) ⇒ Object

Production code should not call this. Only test/tool code should call this, and only if you’re sure the worker queue already exists and is bound to the source



126
127
128
129
130
131
132
# File 'lib/mercury/mercury.rb', line 126

def start_queue_worker(worker_group, handler, &k)
  guard_public(k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    subscribe_worker(queue, handler)
    k.call
  end
end

#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object



113
114
115
116
117
118
119
120
121
# File 'lib/mercury/mercury.rb', line 113

def start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_work_queue(worker_group, exchange, tag_filter) do |queue|
      subscribe_worker(queue, handler)
      k.call
    end
  end
end