Class: LogStash::Inputs::Azuretopicthreadable
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Azuretopicthreadable
- Defined in:
- lib/logstash/inputs/azuretopicthreadable.rb
Overview
Reads events from Azure topics
Defined Under Namespace
Classes: Interrupted
Instance Method Summary collapse
-
#initialize(*args) ⇒ Azuretopicthreadable
constructor
A new instance of Azuretopicthreadable.
-
#process(output_queue, pid) ⇒ Object
def register.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(*args) ⇒ Azuretopicthreadable
Returns a new instance of Azuretopicthreadable.
25 26 27 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 25 def initialize(*args) super(*args) end |
Instance Method Details
#process(output_queue, pid) ⇒ Object
def register
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 40 def process(output_queue, pid) # Get a new instance of a service if @access_key_name # SAS key used signer = Azure::ServiceBus::Auth::SharedAccessSigner.new sb_host = "https://#{Azure.sb_namespace}.servicebus.windows.net" azure_service_bus = Azure::ServiceBus::ServiceBusService.new(sb_host, { signer: signer}) else # ACS key azure_service_bus = Azure::ServiceBus::ServiceBusService.new end while !stop? begin # check if we have a message in the subscription = azure_service_bus.(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } ) if # decoding returns a yield codec.decode(.body) do |event| decorate(event) output_queue << event end # delete the message after reading it azure_service_bus.() else Stud.stoppable_sleep(@thread_sleep_time) { stop? } #topic is probably empty. sleep. end rescue LogStash::ShutdownSignal => e raise e rescue => e @logger.error("Oh My, An error occurred. Thread id:" + pid.to_s, :exception => e) if and .delivery_count > @deliverycount azure_service_bus.() end Stud.stoppable_sleep(@thread_sleep_time) { stop? } end end end |
#register ⇒ Object
30 31 32 33 34 35 36 37 38 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 30 def register # Configure credentials Azure.configure do |config| config.sb_namespace = @namespace config.sb_access_key = @access_key config.sb_sas_key_name = @access_key_name config.sb_sas_key = @access_key end end |
#run(output_queue) ⇒ Object
79 80 81 82 83 84 85 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 79 def run(output_queue) threads = [] (0..(@threads-1)).each do |pid| threads << Thread.new { process(output_queue, pid) } end threads.each { |thr| thr.join } end |
#teardown ⇒ Object
88 89 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 88 def teardown end |