Class: Mercury

Inherits:
Object
  • Object
show all
Defined in:
lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb

Overview

This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.

Defined Under Namespace

Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer

Constant Summary collapse

ORIGINAL_TAG_HEADER =
'Original-Tag'.freeze
REPUBLISH_COUNT_HEADER =
'Republish-Count'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury

Returns a new instance of Mercury.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/mercury/mercury.rb', line 30

def initialize(host: 'localhost',
               port: 5672,
               vhost: '/',
               username: 'guest',
               password: 'guest',
               parallelism: 1,
               on_error: nil,
               wait_for_publisher_confirms: true,
               logger:,
               &k)
  guard_public(k, initializing: true)
  @logger = logger
  @on_error = on_error
  AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password,
               on_tcp_connection_failure: server_down_error_handler) do |amqp|
    @amqp = amqp
    install_lost_connection_error_handler
    AMQP::Channel.new(amqp, prefetch: parallelism) do |channel|
      @channel = channel
      install_channel_error_handler
      if wait_for_publisher_confirms
        enable_publisher_confirms do
          k.call(self)
        end
      else
        k.call(self)
      end
    end
  end
end

Instance Attribute Details

#amqpObject (readonly)

Returns the value of attribute amqp.



12
13
14
# File 'lib/mercury/mercury.rb', line 12

def amqp
  @amqp
end

#channelObject (readonly)

Returns the value of attribute channel.



12
13
14
# File 'lib/mercury/mercury.rb', line 12

def channel
  @channel
end

#loggerObject (readonly)

Returns the value of attribute logger.



12
13
14
# File 'lib/mercury/mercury.rb', line 12

def logger
  @logger
end

Class Method Details

.open(logger: Logger.new(STDOUT), **kws, &k) ⇒ Object



14
15
16
17
# File 'lib/mercury/mercury.rb', line 14

def self.open(logger: Logger.new(STDOUT), **kws, &k)
  new(logger: logger, **kws, &k)
  nil
end

.publish_opts(tag, headers) ⇒ Object



97
98
99
# File 'lib/mercury/mercury.rb', line 97

def self.publish_opts(tag, headers)
  { routing_key: tag, persistent: true, headers: headers }
end

Instance Method Details

#close(&k) ⇒ Object



19
20
21
22
23
24
25
26
27
28
# File 'lib/mercury/mercury.rb', line 19

def close(&k)
  if @amqp
    @amqp.close do
      @amqp = nil
      k.call
    end
  else
    EM.next_tick(&k)
  end
end

#delete_source(source_name, &k) ⇒ Object



125
126
127
128
129
130
131
132
# File 'lib/mercury/mercury.rb', line 125

def delete_source(source_name, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    exchange.delete do
      k.call
    end
  end
end

#delete_work_queue(worker_group, &k) ⇒ Object



134
135
136
137
138
139
140
141
# File 'lib/mercury/mercury.rb', line 134

def delete_work_queue(worker_group, &k)
  guard_public(k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    queue.delete do
      k.call
    end
  end
end

#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object



62
63
64
65
66
67
68
69
# File 'lib/mercury/mercury.rb', line 62

def publish(source_name, msg, tag: '', headers: {}, &k)
  guard_public(k)
  # The amqp gem caches exchange objects, so it's fine to
  # redeclare the exchange every time we publish.
  with_source(source_name) do |exchange|
    publish_internal(exchange, msg, tag, headers, &k)
  end
end

#queue_exists?(queue_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


152
153
154
155
156
157
158
159
# File 'lib/mercury/mercury.rb', line 152

def queue_exists?(queue_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    ch.queue(queue_name, passive: true) do
      ret.call(true)
    end
  end
end

#republish(msg, &k) ⇒ Object

Places a copy of the message at the back of the queue, then acks the original message.



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/mercury/mercury.rb', line 73

def republish(msg, &k)
  guard_public(k)
  raise 'Only messages from a work queue can be republished' unless msg.work_queue_name
  raise 'A message can only be republished by the mercury instance that received it' unless msg.mercury_instance == self
  raise "This message was already #{msg.action_taken}ed" if msg.action_taken
  headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag)
  publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do
    msg.ack
    k.call
  end
end

#source_exists?(source_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


143
144
145
146
147
148
149
150
# File 'lib/mercury/mercury.rb', line 143

def source_exists?(source_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    with_source_no_cache(ch, source_name, passive: true) do
      ret.call(true)
    end
  end
end

#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/mercury/mercury.rb', line 101

def start_listener(source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_listener_queue(exchange, tag_filter) do |queue|
      queue.subscribe(ack: false) do |, payload|
        handler.call(make_received_message(payload, ))
      end
      k.call
    end
  end
end

#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
# File 'lib/mercury/mercury.rb', line 113

def start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_work_queue(worker_group, exchange, tag_filter) do |queue|
      queue.subscribe(ack: true) do |, payload|
        handler.call(make_received_message(payload, , work_queue_name: worker_group))
      end
      k.call
    end
  end
end