Class: StatsD::Instrument::BatchedUDPSink::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/instrument/batched_udp_sink.rb

Constant Summary collapse

BUFFER_CLASS =
if !::Object.const_defined?(:RUBY_ENGINE) || RUBY_ENGINE == "ruby"
  ::Array
else
  begin
    gem("concurrent-ruby")
  rescue Gem::MissingSpecError
    raise Gem::MissingSpecError, "statsd-instrument depends on `concurrent-ruby` on #{RUBY_ENGINE}"
  end
  require "concurrent/array"
  Concurrent::Array
end

Instance Method Summary collapse

Constructor Details

#initialize(host, port, flush_interval) ⇒ Dispatcher

Returns a new instance of Dispatcher.



53
54
55
56
57
58
59
60
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 53

def initialize(host, port, flush_interval)
  @host = host
  @port = port
  @interrupted = false
  @flush_interval = flush_interval
  @buffer = BUFFER_CLASS.new
  @dispatcher_thread = Thread.new { dispatch }
end

Instance Method Details

#<<(datagram) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 62

def <<(datagram)
  unless @dispatcher_thread&.alive?
    # If the dispatcher thread is dead, we assume it is because
    # the process was forked. So to avoid ending datagrams twice
    # we clear the buffer.
    @buffer.clear
    @dispatcher_thread = Thread.new { dispatch }
  end
  @buffer << datagram
  self
end

#shutdownObject



74
75
76
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 74

def shutdown
  @interrupted = true
end