Class: LogStash::Inputs::Azuretopicthreadable
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Azuretopicthreadable
- Defined in:
- lib/logstash/inputs/azuretopicthreadable.rb
Overview
Reads events from Azure topics
Defined Under Namespace
Classes: Interrupted
Instance Method Summary collapse
-
#initialize(*args) ⇒ Azuretopicthreadable
constructor
A new instance of Azuretopicthreadable.
-
#process(output_queue, pid) ⇒ Object
def register.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(*args) ⇒ Azuretopicthreadable
Returns a new instance of Azuretopicthreadable.
25 26 27 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 25 def initialize(*args) super(*args) end |
Instance Method Details
#process(output_queue, pid) ⇒ Object
def register
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 38 def process(output_queue, pid) # Get a new instance of a service azure_service_bus = Azure::ServiceBus::ServiceBusService.new while !stop? begin # check if we have a message in the subscription = azure_service_bus.(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } ) if # decoding returns a yield codec.decode(.body) do |event| decorate(event) output_queue << event end # codec.decode # delete the message after reading it azure_service_bus.() end rescue LogStash::ShutdownSignal => e raise e rescue => e @logger.error("Oh My, An error occurred. Thread id:" + pid.to_s, :exception => e) if and .delivery_count > @deliverycount azure_service_bus.() end end sleep(@thread_sleep_time) end end |
#register ⇒ Object
30 31 32 33 34 35 36 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 30 def register # Configure credentials Azure.configure do |config| config.sb_namespace = @namespace config.sb_access_key = @access_key end end |
#run(output_queue) ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 67 def run(output_queue) threads = [] (0..(@threads-1)).each do |pid| threads << Thread.new { process(output_queue, pid) } end threads.each { |thr| thr.join } end |
#teardown ⇒ Object
76 77 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 76 def teardown end |