Class: Sensu::RabbitMQ

Inherits:
Object
  • Object
show all
Defined in:
lib/sensu/rabbitmq.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRabbitMQ

Returns a new instance of RabbitMQ.



31
32
33
34
35
# File 'lib/sensu/rabbitmq.rb', line 31

def initialize
  @on_error = Proc.new {}
  @before_reconnect = Proc.new {}
  @after_reconnect = Proc.new {}
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



29
30
31
# File 'lib/sensu/rabbitmq.rb', line 29

def channel
  @channel
end

Class Method Details

.connect(options = {}) ⇒ Object



99
100
101
102
103
104
# File 'lib/sensu/rabbitmq.rb', line 99

def self.connect(options={})
  options ||= Hash.new
  rabbitmq = self.new
  rabbitmq.connect(options)
  rabbitmq
end

Instance Method Details

#after_reconnect(&block) ⇒ Object



45
46
47
# File 'lib/sensu/rabbitmq.rb', line 45

def after_reconnect(&block)
  @after_reconnect = block
end

#before_reconnect(&block) ⇒ Object



41
42
43
# File 'lib/sensu/rabbitmq.rb', line 41

def before_reconnect(&block)
  @before_reconnect = block
end

#closeObject



95
96
97
# File 'lib/sensu/rabbitmq.rb', line 95

def close
  @connection.close
end

#connect(options = {}) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/sensu/rabbitmq.rb', line 49

def connect(options={})
  timeout = EM::Timer.new(20) do
    error = RabbitMQError.new('timed out while attempting to connect')
    @on_error.call(error)
  end
  on_failure = Proc.new do
    error = RabbitMQError.new('failed to connect')
    @on_error.call(error)
  end
  @connection = AMQP.connect(options, {
    :on_tcp_connection_failure => on_failure,
    :on_possible_authentication_failure => on_failure
  })
  @connection.logger = Logger.get
  @connection.on_open do
    timeout.cancel
  end
  reconnect = Proc.new do
    unless @connection.reconnecting?
      @before_reconnect.call
      @connection.periodically_reconnect(5)
    end
  end
  @connection.on_tcp_connection_loss(&reconnect)
  @connection.on_skipped_heartbeats(&reconnect)
  @channel = AMQP::Channel.new(@connection)
  @channel.auto_recovery = true
  @channel.on_error do |channel, channel_close|
    error = RabbitMQError.new('rabbitmq channel closed')
    @on_error.call(error)
  end
  prefetch = 1
  if options.is_a?(Hash)
    prefetch = options[:prefetch] || 1
  end
  @channel.on_recovery do
    @after_reconnect.call
    @channel.prefetch(prefetch)
  end
  @channel.prefetch(prefetch)
end

#connected?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/sensu/rabbitmq.rb', line 91

def connected?
  @connection.connected?
end

#on_error(&block) ⇒ Object



37
38
39
# File 'lib/sensu/rabbitmq.rb', line 37

def on_error(&block)
  @on_error = block
end