Class: Mercury

Inherits:
Object
  • Object
show all
Defined in:
lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb

Overview

This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.

Defined Under Namespace

Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer

Constant Summary collapse

ORIGINAL_TAG_HEADER =
'Original-Tag'.freeze
REPUBLISH_COUNT_HEADER =
'Republish-Count'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) ⇒ Mercury

Returns a new instance of Mercury.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/mercury/mercury.rb', line 29

def initialize(host: 'localhost',
               port: 5672,
               vhost: '/',
               username: 'guest',
               password: 'guest',
               parallelism: 1,
               on_error: nil,
               wait_for_publisher_confirms: true,
               logger:,
               &k)
  guard_public(k, initializing: true)
  @logger = logger
  @on_error = on_error
  AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password,
               on_tcp_connection_failure: server_down_error_handler) do |amqp|
    @amqp = amqp
    install_lost_connection_error_handler
    AMQP::Channel.new(amqp, prefetch: parallelism) do |channel|
      @channel = channel
      install_channel_error_handler
      if wait_for_publisher_confirms
        enable_publisher_confirms do
          k.call(self)
        end
      else
        k.call(self)
      end
    end
  end
end

Instance Attribute Details

#amqpObject (readonly)

Returns the value of attribute amqp.



11
12
13
# File 'lib/mercury/mercury.rb', line 11

def amqp
  @amqp
end

#channelObject (readonly)

Returns the value of attribute channel.



11
12
13
# File 'lib/mercury/mercury.rb', line 11

def channel
  @channel
end

#loggerObject (readonly)

Returns the value of attribute logger.



11
12
13
# File 'lib/mercury/mercury.rb', line 11

def logger
  @logger
end

Class Method Details

.open(logger: Logatron, **kws, &k) ⇒ Object



13
14
15
16
# File 'lib/mercury/mercury.rb', line 13

def self.open(logger: Logatron, **kws, &k)
  new(logger: logger, **kws, &k)
  nil
end

.publish_opts(tag, headers) ⇒ Object



96
97
98
# File 'lib/mercury/mercury.rb', line 96

def self.publish_opts(tag, headers)
  { routing_key: tag, persistent: true, headers: Logatron.http_headers.merge(headers) }
end

Instance Method Details

#close(&k) ⇒ Object



18
19
20
21
22
23
24
25
26
27
# File 'lib/mercury/mercury.rb', line 18

def close(&k)
  if @amqp
    @amqp.close do
      @amqp = nil
      k.call
    end
  else
    EM.next_tick(&k)
  end
end

#delete_source(source_name, &k) ⇒ Object



124
125
126
127
128
129
130
131
# File 'lib/mercury/mercury.rb', line 124

def delete_source(source_name, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    exchange.delete do
      k.call
    end
  end
end

#delete_work_queue(worker_group, &k) ⇒ Object



133
134
135
136
137
138
139
140
# File 'lib/mercury/mercury.rb', line 133

def delete_work_queue(worker_group, &k)
  guard_public(k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    queue.delete do
      k.call
    end
  end
end

#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object



61
62
63
64
65
66
67
68
# File 'lib/mercury/mercury.rb', line 61

def publish(source_name, msg, tag: '', headers: {}, &k)
  guard_public(k)
  # The amqp gem caches exchange objects, so it's fine to
  # redeclare the exchange every time we publish.
  with_source(source_name) do |exchange|
    publish_internal(exchange, msg, tag, headers, &k)
  end
end

#queue_exists?(queue_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


151
152
153
154
155
156
157
158
# File 'lib/mercury/mercury.rb', line 151

def queue_exists?(queue_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    ch.queue(queue_name, passive: true) do
      ret.call(true)
    end
  end
end

#republish(msg, &k) ⇒ Object

Places a copy of the message at the back of the queue, then acks the original message.



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/mercury/mercury.rb', line 72

def republish(msg, &k)
  guard_public(k)
  raise 'Only messages from a work queue can be republished' unless msg.work_queue_name
  raise 'A message can only be republished by the mercury instance that received it' unless msg.mercury_instance == self
  raise "This message was already #{msg.action_taken}ed" if msg.action_taken
  headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag)
  publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do
    msg.ack
    k.call
  end
end

#source_exists?(source_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


142
143
144
145
146
147
148
149
# File 'lib/mercury/mercury.rb', line 142

def source_exists?(source_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    with_source_no_cache(ch, source_name, passive: true) do
      ret.call(true)
    end
  end
end

#start_listener(source_name, handler, tag_filter: nil, &k) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/mercury/mercury.rb', line 100

def start_listener(source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_listener_queue(exchange, tag_filter) do |queue|
      queue.subscribe(ack: false) do |, payload|
        handler.call(make_received_message(payload, ))
      end
      k.call
    end
  end
end

#start_worker(worker_group, source_name, handler, tag_filter: nil, &k) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
# File 'lib/mercury/mercury.rb', line 112

def start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_work_queue(worker_group, exchange, tag_filter) do |queue|
      queue.subscribe(ack: true) do |, payload|
        handler.call(make_received_message(payload, , work_queue_name: worker_group))
      end
      k.call
    end
  end
end