Class: Mercury

Inherits:
Object
  • Object
show all
Defined in:
lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/version.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb

Overview

This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.

Defined Under Namespace

Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer

Constant Summary collapse

VERSION =
'0.2.0'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, &k) ⇒ Mercury

Returns a new instance of Mercury.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/mercury/mercury.rb', line 22

def initialize(host: 'localhost',
               port: 5672,
               vhost: '/',
               username: 'guest',
               password: 'guest',
               parallelism: 1,
               on_error: nil,
               wait_for_publisher_confirms: true,
               &k)
  @on_error = on_error
  AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password,
               on_tcp_connection_failure: server_down_error_handler) do |amqp|
    @amqp = amqp
    @channel = AMQP::Channel.new(amqp, prefetch: parallelism) do
      install_channel_error_handler
      install_lost_connection_error_handler
      if wait_for_publisher_confirms
        enable_publisher_confirms do
          k.call(self)
        end
      else
        k.call(self)
      end
    end
  end
end

Instance Attribute Details

#amqpObject (readonly)

Returns the value of attribute amqp.



8
9
10
# File 'lib/mercury/mercury.rb', line 8

def amqp
  @amqp
end

#channelObject (readonly)

Returns the value of attribute channel.



8
9
10
# File 'lib/mercury/mercury.rb', line 8

def channel
  @channel
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/mercury/mercury.rb', line 8

def logger
  @logger
end

Class Method Details

.open(logger: Logatron, **kws, &k) ⇒ Object



10
11
12
13
14
# File 'lib/mercury/mercury.rb', line 10

def self.open(logger: Logatron, **kws, &k)
  @logger = logger
  new(**kws, &k)
  nil
end

.publish_opts(tag, headers) ⇒ Object



65
66
67
# File 'lib/mercury/mercury.rb', line 65

def self.publish_opts(tag, headers)
  { routing_key: tag, persistent: true, headers: Logatron.http_headers.merge(headers) }
end

.source_optsObject



263
264
265
# File 'lib/mercury/mercury.rb', line 263

def self.source_opts
  { durable: true, auto_delete: false }
end

Instance Method Details

#close(&k) ⇒ Object



16
17
18
19
20
# File 'lib/mercury/mercury.rb', line 16

def close(&k)
  @amqp.close do
    k.call
  end
end

#delete_source(source_name, &k) ⇒ Object



91
92
93
94
95
96
97
# File 'lib/mercury/mercury.rb', line 91

def delete_source(source_name, &k)
  with_source(source_name) do |exchange|
    exchange.delete do
      k.call
    end
  end
end

#delete_work_queue(worker_group, &k) ⇒ Object



99
100
101
102
103
104
105
# File 'lib/mercury/mercury.rb', line 99

def delete_work_queue(worker_group, &k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    queue.delete do
      k.call
    end
  end
end

#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/mercury/mercury.rb', line 50

def publish(source_name, msg, tag: '', headers: {}, &k)
  # The amqp gem caches exchange objects, so it's fine to
  # redeclare the exchange every time we publish.
  with_source(source_name) do |exchange|
    payload = write(msg)
    pub_opts = Mercury.publish_opts(tag, headers)
    if publisher_confirms_enabled
      expect_publisher_confirm(k)
      exchange.publish(payload, **pub_opts)
    else
      exchange.publish(payload, **pub_opts, &k)
    end
  end
end

#queue_exists?(queue_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


115
116
117
118
119
120
121
# File 'lib/mercury/mercury.rb', line 115

def queue_exists?(queue_name, &k)
  existence_check(k) do |ch, &ret|
    ch.queue(queue_name, passive: true) do
      ret.call(true)
    end
  end
end

#source_exists?(source_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


107
108
109
110
111
112
113
# File 'lib/mercury/mercury.rb', line 107

def source_exists?(source_name, &k)
  existence_check(k) do |ch, &ret|
    with_source_no_cache(ch, source_name, passive: true) do
      ret.call(true)
    end
  end
end

#start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object



69
70
71
72
73
74
75
76
77
78
# File 'lib/mercury/mercury.rb', line 69

def start_listener(source_name, handler, tag_filter: '#', &k)
  with_source(source_name) do |exchange|
    with_listener_queue(exchange, tag_filter) do |queue|
      queue.subscribe(ack: false) do |, payload|
        handler.call(make_received_message(payload, , false))
      end
      k.call
    end
  end
end

#start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object



80
81
82
83
84
85
86
87
88
89
# File 'lib/mercury/mercury.rb', line 80

def start_worker(worker_group, source_name, handler, tag_filter: '#', &k)
  with_source(source_name) do |exchange|
    with_work_queue(worker_group, exchange, tag_filter) do |queue|
      queue.subscribe(ack: true) do |, payload|
        handler.call(make_received_message(payload, , true))
      end
      k.call
    end
  end
end