Module: Tail

Included in:
LogstashCli::Command
Defined in:
lib/logstash-cli/command/tail.rb

Instance Method Summary collapse

Instance Method Details

#_tail(options) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/logstash-cli/command/tail.rb', line 5

def _tail(options)
  amqp_url = options[:url]

  amqp_user = options[:user]
  amqp_password = options[:password]
  amqp_vhost = options[:vhost]
  amqp_port = options[:port]
  amqp_host = options[:host]
  amqp_ssl = options[:ssl]

  exchange_name = options[:exchange]
  exchange_type = options[:exchange_type]
  persistent = options[:persistent]
  durable = options[:durable]
  auto_delete = options[:autodelete]
  routing_key = options[:key]
  metafields = options[:meta].split(',')

  begin
    #connection = AMQP.connect(AMQP_OPTS.merge(:username => "amqp_gem", :password => "amqp_gem_password", :vhost => "amqp_gem_testbed"))
    settings= { :host =>  amqp_host, :vhost => amqp_vhost, :port => amqp_port,
                :user => amqp_user, :password => amqp_password ,
                :ssl => amqp_ssl }

    # Amqp url can override settings
    unless amqp_url.nil?
      settings = amqp_url
    end

    AMQP.start(settings) do |connection, open_ok|
      trap("INT") { puts "Shutting down..."; connection.close { EM.stop }; exit }

      channel = AMQP::Channel.new(connection, :auto_recovery => true)

      channel.queue("", :auto_delete => auto_delete, :persistent => persistent , :durable => durable)   do |queue, declare_ok|
        queue.bind(exchange_name, :routing_key => routing_key)
        queue.subscribe do |payload|
          parsed_message = JSON.parse(payload)
          result = Array.new

          metafields.each do |metafield|
            result << parsed_message["@#{metafield}"]
          end

          puts _format(result, options)
          result = []
        end
      end
    end
  rescue AMQP::PossibleAuthenticationFailureError => ex
    puts "Possible Authentication error:\nthe AMQP connection URL used is #{amqp_url}\n\nDetail Info:\n#{ex}"
    exit -1
  rescue StandardError => ex
    puts "Error occurred: #{ex}"
    exit -1
  end
end