Class: LogStash::Inputs::Azurewadeventhub

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/azurewadeventhub.rb

Overview

Reads events from Azure event-hub for Windows Azure Diagnostics

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Azurewadeventhub

Returns a new instance of Azurewadeventhub.



34
35
36
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 34

def initialize(*args)
  super(*args)
end

Instance Method Details

#get_pay_load(message, partition) ⇒ Object

def register



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 42

def get_pay_load(message, partition)
  return nil if not message
  message.getPayload().each do |section|
    if section.java_kind_of? org::apache::qpid::amqp_1_0::type::messaging::Data
      data = ""
      begin
        event = LogStash::Event.new()
        section.getValue().getArray().each do |byte|
          data = data + byte.chr
        end
        json = JSON.parse(data)
        # Check if the records field is there. All messages written by WAD should have

        # "records" as the root element

        if !json["records"].nil?
          recordArray = json["records"]
          recordArray.each do |record|
            record.each do |name, value|
              event[name] = value 
            end
          end
        end
        return event
      rescue => e
        if data != ""
          @logger.error("  " + partition.to_s.rjust(2,"0") + " --- " + "Error: Unable to JSON parse '" + data + "'.", :exception => e)
        else
          @logger.error("  " + partition.to_s.rjust(2,"0") + " --- " + "Error: Unable to get the message body for message", :exception => e)
        end 
      end
    end
  end
  return nil
end

#process(output_queue, receiver, partition) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 76

def process(output_queue, receiver, partition)
  while true
    begin
      msg = receiver.receive(10)
      if msg
        event = get_pay_load(msg, partition)
        if event
          output_queue << event
        end
        receiver.acknowledge(msg)
      else
        @logger.debug("  " + partition.to_s.rjust(2,"0") + " --- " + "No message")
        sleep(@thread_wait_sec)
      end
    rescue LogStash::ShutdownSignal => e
      raise e
    rescue org::apache::qpid::amqp_1_0::client::ConnectionErrorException => e
      raise e
    rescue => e
      @logger.error("  " + partition.to_s.rjust(2,"0") + " --- " + "Oh My, An error occurred.", :exception => e)
    end
  end # process

end

#process_partition(output_queue, partition) ⇒ Object

process



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 100

def process_partition(output_queue, partition)
  while true
    begin
      filter = SelectorFilter.new "amqp.annotation.x-opt-enqueuedtimeutc > '" + @time_since_epoch_millis.to_s + "'"
      filters = { org::apache::qpid::amqp_1_0::type::Symbol.valueOf("apache.org:selector-filter:string") => filter }
      host = @namespace + "." + @domain
      connection = org::apache::qpid::amqp_1_0::client::Connection.new(host, @port, @username, @key, host, true)
      connection.getEndpoint().getDescribedTypeRegistry().register(filter.java_class, WriterFactory.new)
      receiveSession = connection.createSession()
      receiver = receiveSession.createReceiver(@eventhub + "/ConsumerGroups/" + @consumer_group + "/Partitions/" + partition.to_s, org::apache::qpid::amqp_1_0::client::AcknowledgeMode::ALO, "eventhubs-receiver-link-" + partition.to_s, false, filters, nil)
      receiver.setCredit(org::apache::qpid::amqp_1_0::type::UnsignedInteger.valueOf(@receive_credits), true)
      process(output_queue,receiver,partition)
    rescue org::apache::qpid::amqp_1_0::client::ConnectionErrorException => e
      @logger.debug("  " + partition.to_s.rjust(2,"0") + " --- " + "resetting connection")
      @time_since_epoch_millis = Time.now.to_i * 1000
    end
  end
rescue LogStash::ShutdownSignal => e
  receiver.close()
  raise e
rescue => e
  @logger.error("  " + partition.to_s.rjust(2,"0") + " --- Oh My, An error occurred.", :exception => e)
end

#registerObject



39
40
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 39

def register
end

#run(output_queue) ⇒ Object



125
126
127
128
129
130
131
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 125

def run(output_queue)
  threads = []
  (0..(@partitions-1)).each do |p_id|
    threads << Thread.new { process_partition(output_queue, p_id) }
  end
  threads.each { |thr| thr.join }
end

#teardownObject



134
135
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 134

def teardown
end