Class: LogStash::Inputs::Azurewadeventhub

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/azurewadeventhub.rb

Overview

Reads events from Azure event-hub for Windows Azure Diagnostics

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Azurewadeventhub

Returns a new instance of Azurewadeventhub.



35
36
37
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 35

def initialize(*args)
  super(*args)
end

Instance Method Details

#get_pay_load(message, partition) ⇒ Object

def register



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 43

def get_pay_load(message, partition)
  return nil if not message
  message.getPayload().each do |section|
    if section.java_kind_of? org::apache::qpid::amqp_1_0::type::messaging::Data
      data = ""
      begin
        event = LogStash::Event.new()
        section.getValue().getArray().each do |byte|
          data = data + byte.chr
        end
        json = JSON.parse(data)
        # Check if the records field is there. All messages written by WAD should have

        # "records" as the root element

        if !json["records"].nil?
          recordArray = json["records"]
          recordArray.each do |record|
            record.each do |name, value|
              event[name] = value 
            end
          end
        end
        return event
      rescue => e
        if data != ""
          @logger.error("  " + partition.to_s.rjust(2,"0") + " --- " + "Error: Unable to JSON parse '" + data + "'.", :exception => e)
        else
          @logger.error("  " + partition.to_s.rjust(2,"0") + " --- " + "Error: Unable to get the message body for message", :exception => e)
        end 
      end
    end
  end
  return nil
end

#process(output_queue, receiver, partition) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 77

def process(output_queue, receiver, partition)
  while !stop?
    begin
      msg = receiver.receive(10)
      if msg
        event = get_pay_load(msg, partition)
        if event
          output_queue << event
        end
        receiver.acknowledge(msg)
      else
        @logger.debug("  " + partition.to_s.rjust(2,"0") + " --- " + "No message")
        sleep(@thread_wait_sec)
      end
    rescue LogStash::ShutdownSignal => e
      raise e
    rescue org::apache::qpid::amqp_1_0::client::ConnectionErrorException => e
      raise e
    rescue => e
      @logger.error("  " + partition.to_s.rjust(2,"0") + " --- " + "Oh My, An error occurred.", :exception => e)
    end
  end # process

end

#process_partition(output_queue, partition) ⇒ Object

process



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 101

def process_partition(output_queue, partition)
  while !stop?
    begin
      filter = SelectorFilter.new "amqp.annotation.x-opt-enqueuedtimeutc > '" + @time_since_epoch_millis.to_s + "'"
      filters = { org::apache::qpid::amqp_1_0::type::Symbol.valueOf("apache.org:selector-filter:string") => filter }
      host = @namespace + "." + @domain
      connection = org::apache::qpid::amqp_1_0::client::Connection.new(host, @port, @username, @key, host, true)
      connection.getEndpoint().getDescribedTypeRegistry().register(filter.java_class, WriterFactory.new)
      receiveSession = connection.createSession()
      receiver = receiveSession.createReceiver(@eventhub + "/ConsumerGroups/" + @consumer_group + "/Partitions/" + partition.to_s, org::apache::qpid::amqp_1_0::client::AcknowledgeMode::ALO, "eventhubs-receiver-link-" + partition.to_s, false, filters, nil)
      receiver.setCredit(org::apache::qpid::amqp_1_0::type::UnsignedInteger.valueOf(@receive_credits), true)
      process(output_queue,receiver,partition)
    rescue org::apache::qpid::amqp_1_0::client::ConnectionErrorException => e
      @logger.debug("  " + partition.to_s.rjust(2,"0") + " --- " + "resetting connection")
      @time_since_epoch_millis = Time.now.utc.to_i * 1000
    end
  end
rescue LogStash::ShutdownSignal => e
  receiver.close()
  raise e
rescue => e
  @logger.error("  " + partition.to_s.rjust(2,"0") + " --- Oh My, An error occurred.", :exception => e)
end

#registerObject



40
41
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 40

def register
end

#run(output_queue) ⇒ Object



126
127
128
129
130
131
132
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 126

def run(output_queue)
  threads = []
  (0..(@partitions-1)).each do |p_id|
    threads << Thread.new { process_partition(output_queue, p_id) }
  end
  threads.each { |thr| thr.join }
end

#teardownObject



135
136
# File 'lib/logstash/inputs/azurewadeventhub.rb', line 135

def teardown
end