Class: LogStash::Inputs::Udp

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/udp.rb

Overview

Read messages as events over the network via udp. The only required configuration item is ‘port`, which specifies the udp port logstash will listen on for event streams.

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ Udp

Returns a new instance of Udp.



44
45
46
47
# File 'lib/logstash/inputs/udp.rb', line 44

def initialize(params)
  super
  BasicSocket.do_not_reverse_lookup = true
end

Instance Method Details

#closeObject



81
82
83
84
85
# File 'lib/logstash/inputs/udp.rb', line 81

def close
  if @udp && !@udp.closed?
    @udp.close rescue ignore_close_and_log($!)
  end
end

#registerObject



49
50
51
52
# File 'lib/logstash/inputs/udp.rb', line 49

def register
  @udp = nil
  @metric_errors = metric.namespace(:errors)
end

#run(output_queue) ⇒ Object

def register



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/logstash/inputs/udp.rb', line 54

def run(output_queue)
  @output_queue = output_queue

  @input_to_worker = SizedQueue.new(@queue_size)
  metric.gauge(:queue_size, @queue_size)
  metric.gauge(:workers, @workers)

  @input_workers = (1..@workers).to_a.map do |i|
    @logger.debug("Starting UDP worker thread", :worker => i)
    Thread.new(i, @codec.clone) { |i, codec| inputworker(i, codec) }
  end

  begin
    # udp server
    udp_listener(output_queue)
  rescue => e
    @logger.error("UDP listener died", :exception => e, :backtrace => e.backtrace)
    @metric_errors.increment(:listener)
    Stud.stoppable_sleep(5) { stop? }
    retry unless stop?
  ensure
    # signal workers to end
    @input_workers.size.times { @input_to_worker.push([:END, nil]) }
    @input_workers.each { |thread| thread.join }
  end
end

#stopObject



87
88
89
90
91
# File 'lib/logstash/inputs/udp.rb', line 87

def stop
  if @udp && !@udp.closed?
    @udp.close rescue ignore_close_and_log($!)
  end
end