Class: StatsD::Instrument::BatchedUDPSink::Dispatcher
- Inherits:
-
Object
- Object
- StatsD::Instrument::BatchedUDPSink::Dispatcher
- Defined in:
- lib/statsd/instrument/batched_udp_sink.rb
Instance Method Summary collapse
- #<<(datagram) ⇒ Object
- #flush(blocking:) ⇒ Object
-
#initialize(host, port, buffer_capacity, thread_priority, max_packet_size) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
- #shutdown(wait = 2) ⇒ Object
Constructor Details
#initialize(host, port, buffer_capacity, thread_priority, max_packet_size) ⇒ Dispatcher
Returns a new instance of Dispatcher.
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 81 def initialize(host, port, buffer_capacity, thread_priority, max_packet_size) @udp_sink = UDPSink.new(host, port) @interrupted = false @thread_priority = thread_priority @max_packet_size = max_packet_size @buffer_capacity = buffer_capacity @buffer = Buffer.new(buffer_capacity) @dispatcher_thread = Thread.new { dispatch } @pid = Process.pid end |
Instance Method Details
#<<(datagram) ⇒ Object
92 93 94 95 96 97 98 99 100 |
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 92 def <<(datagram) if !thread_healthcheck || !@buffer.push_nonblock(datagram) # The buffer is full or the thread can't be respaned, # we'll send the datagram synchronously @udp_sink << datagram end self end |
#flush(blocking:) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 111 def flush(blocking:) packet = "".b next_datagram = nil until @buffer.closed? && @buffer.empty? && next_datagram.nil? if blocking next_datagram ||= @buffer.pop break if next_datagram.nil? # queue was closed else next_datagram ||= @buffer.pop_nonblock break if next_datagram.nil? # no datagram in buffer end packet << next_datagram next_datagram = nil if packet.bytesize <= @max_packet_size while (next_datagram = @buffer.pop_nonblock) if @max_packet_size - packet.bytesize - 1 > next_datagram.bytesize packet << NEWLINE << next_datagram else break end end end @udp_sink << packet packet.clear end end |
#shutdown(wait = 2) ⇒ Object
102 103 104 105 106 107 108 109 |
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 102 def shutdown(wait = 2) @interrupted = true @buffer.close if @dispatcher_thread&.alive? @dispatcher_thread.join(wait) end flush(blocking: false) end |