Module: LogStash::Outputs::RabbitMQ::MarchHareImpl

Included in:
LogStash::Outputs::RabbitMQ
Defined in:
lib/logstash/outputs/rabbitmq/march_hare.rb

Instance Method Summary collapse

Instance Method Details

#connectObject

Implementation



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/logstash/outputs/rabbitmq/march_hare.rb', line 82

def connect
  return if terminating?

  @vhost       ||= "127.0.0.1"
  # 5672. Will be switched to 5671 by Bunny if TLS is enabled.
  @port        ||= 5672

  @settings = {
    :vhost => @vhost,
    :host  => @host,
    :port  => @port,
    :user  => @user,
    :automatic_recovery => false
  }
  @settings[:pass]      = if @password
                            @password.value
                          else
                            "guest"
                          end

  @settings[:tls]        = @ssl if @ssl
  proto                  = if @ssl
                             "amqp"
                           else
                             "amqps"
                           end
  @connection_url        = "#{proto}://#{@user}@#{@host}:#{@port}#{vhost}/#{@queue}"

  begin
    @conn = MarchHare.connect(@settings)

    @logger.debug("Connecting to RabbitMQ. Settings: #{@settings.inspect}, queue: #{@queue.inspect}")

    @ch = @conn.create_channel
    @logger.info("Connected to RabbitMQ at #{@settings[:host]}")
  rescue MarchHare::Exception => e
    @connected.set(false)
    n = 10

    @logger.error("RabbitMQ connection error: #{e.message}. Will attempt to reconnect in #{n} seconds...",
                  :exception => e,
                  :backtrace => e.backtrace)
    return if terminating?

    sleep n
    retry
  end
end

#declare_exchangeObject



131
132
133
134
135
136
137
138
139
140
# File 'lib/logstash/outputs/rabbitmq/march_hare.rb', line 131

def declare_exchange
  @logger.debug("Declaring an exchange", :name => @exchange, :type => @exchange_type,
                :durable => @durable)
  @x = @ch.exchange(@exchange, :type => @exchange_type.to_sym, :durable => @durable)

  # sets @connected to true during recovery. MK.
  @connected.set(true)

  @x
end

#publish_serialized(message) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/logstash/outputs/rabbitmq/march_hare.rb', line 38

def publish_serialized(message)
  begin
    if @connected.get
      @x.publish(message, :routing_key => @key, :properties => {
        :persistent => @persistent
      })
    else
      @logger.warn("Tried to send a message, but not connected to RabbitMQ.")
    end
  rescue MarchHare::Exception, IOError, com.rabbitmq.client.AlreadyClosedException => e
    @connected.set(false)
    n = 10

    @logger.error("RabbitMQ connection error: #{e.message}. Will attempt to reconnect in #{n} seconds...",
                  :exception => e,
                  :backtrace => e.backtrace)
    return if terminating?

    sleep n

    connect
    declare_exchange
    retry
  end
end

#receive(event) ⇒ Object



27
28
29
30
31
32
33
34
35
36
# File 'lib/logstash/outputs/rabbitmq/march_hare.rb', line 27

def receive(event)
  return unless output?(event)

  begin
    @codec.encode(event)
  rescue JSON::GeneratorError => e
    @logger.warn("Trouble converting event to JSON", :exception => e,
                 :event => event)
  end
end

#registerObject

API



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/logstash/outputs/rabbitmq/march_hare.rb', line 10

def register
  require "march_hare"
  require "java"

  @logger.info("Registering output", :plugin => self)

  @connected = java.util.concurrent.atomic.AtomicBoolean.new

  connect
  declare_exchange

  @connected.set(true)

  @codec.on_event(&method(:publish_serialized))
end

#teardownObject



68
69
70
71
72
73
74
# File 'lib/logstash/outputs/rabbitmq/march_hare.rb', line 68

def teardown
  @connected.set(false)
  @conn.close if @conn && @conn.open?
  @conn = nil

  finished
end

#to_sObject



64
65
66
# File 'lib/logstash/outputs/rabbitmq/march_hare.rb', line 64

def to_s
  return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@exchange}\##{@key}"
end