Class: Mercury

Inherits:
Object
  • Object
show all
Defined in:
lib/mercury/cps.rb,
lib/mercury/fake.rb,
lib/mercury/sync.rb,
lib/mercury/utils.rb,
lib/mercury/cps/seq.rb,
lib/mercury/mercury.rb,
lib/mercury/monadic.rb,
lib/mercury/version.rb,
lib/mercury/fake/queue.rb,
lib/mercury/test_utils.rb,
lib/mercury/cps/methods.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/wire_serializer.rb,
lib/mercury/cps/seq_with_let.rb,
lib/mercury/received_message.rb,
lib/mercury/fake/queued_message.rb

Overview

This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.

Defined Under Namespace

Modules: TestUtils Classes: Cps, Fake, Monadic, ReceivedMessage, Sync, Utils, WireSerializer

Constant Summary collapse

VERSION =
'0.1.9'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, &k) ⇒ Mercury

Returns a new instance of Mercury.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/mercury/mercury.rb', line 22

def initialize(host: 'localhost',
               port: 5672,
               vhost: '/',
               username: 'guest',
               password: 'guest',
               parallelism: 1,
               on_error: nil,
               &k)
  @on_error = on_error
  AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password,
               on_tcp_connection_failure: server_down_error_handler) do |amqp|
    @amqp = amqp
    @channel = AMQP::Channel.new(amqp, prefetch: parallelism) do
      @channel.confirm_select
      install_channel_error_handler
      install_lost_connection_error_handler
      k.call(self)
    end
  end
end

Instance Attribute Details

#amqpObject (readonly)

Returns the value of attribute amqp.



8
9
10
# File 'lib/mercury/mercury.rb', line 8

def amqp
  @amqp
end

#channelObject (readonly)

Returns the value of attribute channel.



8
9
10
# File 'lib/mercury/mercury.rb', line 8

def channel
  @channel
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/mercury/mercury.rb', line 8

def logger
  @logger
end

Class Method Details

.open(logger: Logatron, **kws, &k) ⇒ Object



10
11
12
13
14
# File 'lib/mercury/mercury.rb', line 10

def self.open(logger: Logatron, **kws, &k)
  @logger = logger
  new(**kws, &k)
  nil
end

.publish_opts(tag, headers) ⇒ Object



55
56
57
# File 'lib/mercury/mercury.rb', line 55

def self.publish_opts(tag, headers)
  { routing_key: tag, persistent: true, headers: Logatron.http_headers.merge(headers) }
end

Instance Method Details

#close(&k) ⇒ Object



16
17
18
19
20
# File 'lib/mercury/mercury.rb', line 16

def close(&k)
  @amqp.close do
    k.call
  end
end

#delete_source(source_name, &k) ⇒ Object



81
82
83
84
85
86
87
# File 'lib/mercury/mercury.rb', line 81

def delete_source(source_name, &k)
  with_source(source_name) do |exchange|
    exchange.delete do
      k.call
    end
  end
end

#delete_work_queue(worker_group, &k) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/mercury/mercury.rb', line 89

def delete_work_queue(worker_group, &k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    queue.delete do
      k.call
    end
  end
end

#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object



44
45
46
47
48
49
50
51
52
53
# File 'lib/mercury/mercury.rb', line 44

def publish(source_name, msg, tag: '', headers: {}, &k)
  # The amqp gem caches exchange objects, so it's fine to
  # redeclare the exchange every time we publish.
  # TODO: wait for publish confirmations (@channel.on_ack)
  with_source(source_name) do |exchange|
    exchange.publish(write(msg), **Mercury.publish_opts(tag, headers)) do
      k.call
    end
  end
end

#queue_exists?(queue_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


105
106
107
108
109
110
111
# File 'lib/mercury/mercury.rb', line 105

def queue_exists?(queue_name, &k)
  existence_check(k) do |ch, &ret|
    ch.queue(queue_name, passive: true) do
      ret.call(true)
    end
  end
end

#source_exists?(source_name, &k) ⇒ Boolean

Returns:

  • (Boolean)


97
98
99
100
101
102
103
# File 'lib/mercury/mercury.rb', line 97

def source_exists?(source_name, &k)
  existence_check(k) do |ch, &ret|
    with_source_no_cache(ch, source_name, passive: true) do
      ret.call(true)
    end
  end
end

#start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object



59
60
61
62
63
64
65
66
67
68
# File 'lib/mercury/mercury.rb', line 59

def start_listener(source_name, handler, tag_filter: '#', &k)
  with_source(source_name) do |exchange|
    with_listener_queue(exchange, tag_filter) do |queue|
      queue.subscribe(ack: false) do |, payload|
        handler.call(make_received_message(payload, , false))
      end
      k.call
    end
  end
end

#start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/mercury/mercury.rb', line 70

def start_worker(worker_group, source_name, handler, tag_filter: '#', &k)
  with_source(source_name) do |exchange|
    with_work_queue(worker_group, exchange, tag_filter) do |queue|
      queue.subscribe(ack: true) do |, payload|
        handler.call(make_received_message(payload, , true))
      end
      k.call
    end
  end
end