Class: LogStash::Inputs::Udp
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Udp
- Defined in:
- lib/logstash/inputs/udp.rb
Overview
Read messages as events over the network via udp. The only required configuration item is ‘port`, which specifies the udp port logstash will listen on for event streams.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(params) ⇒ Udp
constructor
A new instance of Udp.
- #register ⇒ Object
-
#run(output_queue) ⇒ Object
def register.
- #stop ⇒ Object
Constructor Details
#initialize(params) ⇒ Udp
Returns a new instance of Udp.
44 45 46 47 |
# File 'lib/logstash/inputs/udp.rb', line 44 def initialize(params) super BasicSocket.do_not_reverse_lookup = true end |
Instance Method Details
#close ⇒ Object
81 82 83 84 85 |
# File 'lib/logstash/inputs/udp.rb', line 81 def close if @udp && !@udp.closed? @udp.close rescue ignore_close_and_log($!) end end |
#register ⇒ Object
49 50 51 52 |
# File 'lib/logstash/inputs/udp.rb', line 49 def register @udp = nil @metric_errors = metric.namespace(:errors) end |
#run(output_queue) ⇒ Object
def register
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/logstash/inputs/udp.rb', line 54 def run(output_queue) @output_queue = output_queue @input_to_worker = SizedQueue.new(@queue_size) metric.gauge(:queue_size, @queue_size) metric.gauge(:workers, @workers) @input_workers = (1..@workers).to_a.map do |i| @logger.debug("Starting UDP worker thread", :worker => i) Thread.new(i, @codec.clone) { |i, codec| inputworker(i, codec) } end begin # udp server udp_listener(output_queue) rescue => e @logger.error("UDP listener died", :exception => e, :backtrace => e.backtrace) @metric_errors.increment(:listener) Stud.stoppable_sleep(5) { stop? } retry unless stop? ensure # signal workers to end @input_workers.size.times { @input_to_worker.push([:END, nil]) } @input_workers.each { |thread| thread.join } end end |
#stop ⇒ Object
87 88 89 90 91 |
# File 'lib/logstash/inputs/udp.rb', line 87 def stop if @udp && !@udp.closed? @udp.close rescue ignore_close_and_log($!) end end |