Class: Anschel::Input::RabbitMQ

Inherits:
Base
  • Object
show all
Defined in:
lib/anschel/input/rabbitmq.rb

Instance Method Summary collapse

Constructor Details

#initialize(output, config, stats, log) ⇒ RabbitMQ

Returns a new instance of RabbitMQ.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/anschel/input/rabbitmq.rb', line 9

def initialize output, config, stats, log
  connection_defaults = {
    heartbeat_interval: 30,
    connection_timeout: 10,
    automatically_recover: false
  }

  exchange_defaults = {
    type: 'x-consistent-hash',
    durable: true
  }

  queue_defaults = {
    exclusive: false,
    auto_delete: false,
    durable: true
  }

  binding_defaults = {
    routing_key: '100'
  }

  subscription_defaults = {
    block: true,
    ack: true,
    manual_ack: true
  }

  connection = ::MarchHare.connect \
    connection_defaults.merge(config[:connection] || {})

  handle_errors connection, log

  exchange_name = config[:exchange].delete(:name)

  @threads = config[:queues].map do |queue_name, queue_config|
    Thread.new do
      channel = connection.create_channel
      channel.prefetch = config[:prefetch] || 256

      exchange = channel.exchange exchange_name, \
        exchange_defaults.merge(config[:exchange])

      subscription = subscription_defaults.merge \
        (config[:subscription] || {})

      queue = channel.queue queue_name.to_s, \
        queue_defaults.merge(queue_config)

      queue.bind exchange, \
        binding_defaults.merge(config[:binding] || {})

      log.debug \
        event: 'input-rabbitmq-connecting-queue',
        queue: queue_name

      queue.subscribe(subscription) do |meta, message|
        output << message
        stats.inc 'input'
        channel.ack meta.delivery_tag if subscription[:manual_ack]
      end
    end
  end
end

Instance Method Details

#stopObject



74
75
76
77
78
# File 'lib/anschel/input/rabbitmq.rb', line 74

def stop
  return if @stopped
  @threads.map(&:kill)
  @stopped = true
end