Class: LogStash::Inputs::Azuretopicthreadable

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/azuretopicthreadable.rb

Overview

Reads events from Azure topics

Defined Under Namespace

Classes: Interrupted

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Azuretopicthreadable

Returns a new instance of Azuretopicthreadable.



25
26
27
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 25

def initialize(*args)
super(*args)
end

Instance Method Details

#process(output_queue, pid) ⇒ Object

def register



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 38

def process(output_queue, pid)
  # Get a new instance of a service
  azure_service_bus = Azure::ServiceBus::ServiceBusService.new
  while !stop?
    begin
      # check if we have a message in the subscription
      message = azure_service_bus.receive_subscription_message(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } )
      if message
        # decoding returns a yield
        codec.decode(message.body) do |event|
        decorate(event)
        output_queue << event
      end # codec.decode
      # delete the message after reading it
      azure_service_bus.delete_subscription_message(message)
    end
    rescue LogStash::ShutdownSignal => e
      raise e
    rescue => e
      @logger.error("Oh My, An error occurred. Thread id:" + pid.to_s, :exception => e)
      if message and message.delivery_count > @deliverycount
        azure_service_bus.delete_subscription_message(message)
      end
    end
    sleep(@thread_sleep_time)
  end
end

#registerObject



30
31
32
33
34
35
36
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 30

def register
  # Configure credentials
  Azure.configure do |config|
    config.sb_namespace = @namespace
    config.sb_access_key = @access_key
  end
end

#run(output_queue) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 67

def run(output_queue)
  threads = []
    (0..(@threads-1)).each do |pid|
      threads << Thread.new { process(output_queue, pid) }
    end
  threads.each { |thr| thr.join }
end

#teardownObject



76
77
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 76

def teardown
end