Class: Sensu::Transport::RabbitMQ

Inherits:
Base
  • Object
show all
Defined in:
lib/sensu/transport/rabbitmq.rb

Instance Attribute Summary

Attributes inherited from Base

#logger

Instance Method Summary collapse

Methods inherited from Base

#after_reconnect, #before_reconnect, descendants, #initialize, #on_error

Constructor Details

This class inherits a constructor from Sensu::Transport::Base

Instance Method Details

#acknowledge(info) {|info| ... } ⇒ Object Also known as: ack

Acknowledge the delivery of a message from RabbitMQ.

Yields:

  • (info)

    passes acknowledgment info to an optional callback/block.



117
118
119
120
121
122
# File 'lib/sensu/transport/rabbitmq.rb', line 117

def acknowledge(info)
  catch_errors do
    info.ack
  end
  super
end

#closeObject

Close the RabbitMQ connection.



43
44
45
46
# File 'lib/sensu/transport/rabbitmq.rb', line 43

def close
  callback = Proc.new { @connection.close }
  connected? ? callback.call : EM.next_tick(callback)
end

#connect(options = {}) ⇒ Object

RabbitMQ connection setup. The deferred status is set to ‘:succeeded` (via `succeed()`) once the connection has been established.



16
17
18
19
20
21
# File 'lib/sensu/transport/rabbitmq.rb', line 16

def connect(options={})
  reset
  set_connection_options(options)
  create_connection_timeout
  connect_with_eligible_options
end

#connected?TrueClass, FalseClass

Indicates if connected to RabbitMQ.



38
39
40
# File 'lib/sensu/transport/rabbitmq.rb', line 38

def connected?
  @connection.connected?
end

#publish(type, pipe, message, options = {}) {|info| ... } ⇒ Object

Publish a message to RabbitMQ.

Yields:

  • (info)

    passes publish info to an optional callback/block.

Yield Parameters:

  • info (Hash)

    contains publish information.



59
60
61
62
63
64
65
66
# File 'lib/sensu/transport/rabbitmq.rb', line 59

def publish(type, pipe, message, options={})
  catch_errors do
    @channel.method(type.to_sym).call(pipe, options).publish(message) do
      info = {}
      yield(info) if block_given?
    end
  end
end

#reconnect(force = false) ⇒ Object

Reconnect to RabbitMQ.



26
27
28
29
30
31
32
33
# File 'lib/sensu/transport/rabbitmq.rb', line 26

def reconnect(force=false)
  unless @reconnecting
    @reconnecting = true
    @before_reconnect.call
    reset
    periodically_reconnect
  end
end

#stats(funnel, options = {}) {|info| ... } ⇒ Object

RabbitMQ queue stats, including message and consumer counts.

Yields:

  • (info)

    passes queue stats to the callback/block.

Yield Parameters:

  • info (Hash)

    contains queue stats.



133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/sensu/transport/rabbitmq.rb', line 133

def stats(funnel, options={})
  catch_errors do
    options = options.merge(:auto_delete => true)
    @channel.queue(funnel, options).status do |messages, consumers|
      info = {
        :messages => messages,
        :consumers => consumers
      }
      yield(info)
    end
  end
end

#subscribe(type, pipe, funnel = "", options = {}) {|info, message| ... } ⇒ Object

Subscribe to a RabbitMQ queue.

Yields:

  • (info, message)

    passes message info and content to the consumer callback/block.

Yield Parameters:

  • info (Hash)

    contains message information.

  • message (String)

    message.



79
80
81
82
83
84
85
86
87
88
89
# File 'lib/sensu/transport/rabbitmq.rb', line 79

def subscribe(type, pipe, funnel="", options={}, &callback)
  catch_errors do
    previously_declared = @queues.has_key?(funnel)
    @queues[funnel] ||= @channel.queue!(funnel, :auto_delete => true)
    queue = @queues[funnel]
    queue.bind(@channel.method(type.to_sym).call(pipe))
    unless previously_declared
      queue.subscribe(options, &callback)
    end
  end
end

#unsubscribe {|info| ... } ⇒ Object

Unsubscribe from all RabbitMQ queues.

Yields:

  • (info)

    passes info to an optional callback/block.

Yield Parameters:

  • info (Hash)

    contains unsubscribe information.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/sensu/transport/rabbitmq.rb', line 95

def unsubscribe
  catch_errors do
    @queues.values.each do |queue|
      if connected?
        queue.unsubscribe
      else
        queue.before_recovery do
          queue.unsubscribe
        end
      end
    end
    @queues = {}
    @channel.recover if connected?
  end
  super
end