Class: LogStash::Inputs::Azuretopicthreadable

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/azuretopicthreadable.rb

Overview

Reads events from Azure topics

Defined Under Namespace

Classes: Interrupted

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Azuretopicthreadable

Returns a new instance of Azuretopicthreadable.



25
26
27
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 25

def initialize(*args)
super(*args)
end

Instance Method Details

#process(output_queue, pid) ⇒ Object

def register



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 40

def process(output_queue, pid)
  # Get a new instance of a service
  if @access_key_name
      # SAS key used
      signer = Azure::ServiceBus::Auth::SharedAccessSigner.new
      sb_host = "https://#{Azure.sb_namespace}.servicebus.windows.net"
      azure_service_bus = Azure::ServiceBus::ServiceBusService.new(sb_host, { signer: signer})
  else
      # ACS key
      azure_service_bus = Azure::ServiceBus::ServiceBusService.new
  end
  while !stop?
    begin
      # check if we have a message in the subscription
      message = azure_service_bus.receive_subscription_message(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } )
      if message
        # decoding returns a yield
        codec.decode(message.body) do |event|
            decorate(event)
            output_queue << event
        end
        # delete the message after reading it
        azure_service_bus.delete_subscription_message(message)
      else
        Stud.stoppable_sleep(@thread_sleep_time) { stop? } #topic is probably empty. sleep. 
      end
    rescue LogStash::ShutdownSignal => e
      raise e
    rescue => e
      @logger.error("Oh My, An error occurred. Thread id:" + pid.to_s, :exception => e)
      if message and message.delivery_count > @deliverycount
        azure_service_bus.delete_subscription_message(message)
      end
      Stud.stoppable_sleep(@thread_sleep_time) { stop? }
    end
  end
end

#registerObject



30
31
32
33
34
35
36
37
38
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 30

def register
  # Configure credentials
  Azure.configure do |config|
    config.sb_namespace = @namespace
    config.sb_access_key = @access_key
    config.sb_sas_key_name = @access_key_name
    config.sb_sas_key = @access_key
  end
end

#run(output_queue) ⇒ Object



79
80
81
82
83
84
85
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 79

def run(output_queue)
  threads = []
    (0..(@threads-1)).each do |pid|
      threads << Thread.new { process(output_queue, pid) }
    end
  threads.each { |thr| thr.join }
end

#teardownObject



88
89
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 88

def teardown
end