Class: StatsD::Instrument::BatchedUDPSink::Dispatcher
- Inherits:
-
Object
- Object
- StatsD::Instrument::BatchedUDPSink::Dispatcher
- Defined in:
- lib/statsd/instrument/batched_udp_sink.rb
Constant Summary collapse
- BUFFER_CLASS =
if !::Object.const_defined?(:RUBY_ENGINE) || RUBY_ENGINE == "ruby" ::Array else begin gem("concurrent-ruby") rescue Gem::MissingSpecError raise Gem::MissingSpecError, "statsd-instrument depends on `concurrent-ruby` on #{RUBY_ENGINE}" end require "concurrent/array" Concurrent::Array end
Instance Method Summary collapse
- #<<(datagram) ⇒ Object
-
#initialize(host, port, flush_interval, flush_threshold, buffer_capacity, thread_priority, max_packet_size) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
- #shutdown(wait = @flush_interval * 2) ⇒ Object
Constructor Details
#initialize(host, port, flush_interval, flush_threshold, buffer_capacity, thread_priority, max_packet_size) ⇒ Dispatcher
Returns a new instance of Dispatcher.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 77 def initialize(host, port, flush_interval, flush_threshold, buffer_capacity, thread_priority, max_packet_size) @host = host @port = port @interrupted = false @flush_interval = flush_interval @flush_threshold = flush_threshold @buffer_capacity = buffer_capacity @thread_priority = thread_priority @max_packet_size = max_packet_size @buffer = BUFFER_CLASS.new @dispatcher_thread = Thread.new { dispatch } @pid = Process.pid @monitor = Monitor.new @condition = @monitor.new_cond end |
Instance Method Details
#<<(datagram) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 93 def <<(datagram) if thread_healthcheck @buffer << datagram # To avoid sending too many signals when the thread is already flushing # We only signal when the queue size is a multiple of `flush_threshold` if @buffer.size % @flush_threshold == 0 wakeup_thread end # A SizedQueue would be perfect, except that it doesn't have a timeout # Ref: https://bugs.ruby-lang.org/issues/18774 if @buffer.size >= @buffer_capacity StatsD.logger.warn do "[#{self.class.name}] Max buffer size reached (#{@buffer_capacity}), pausing " \ "thread##{Thread.current.object_id}" end before = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) @monitor.synchronize do while @buffer.size >= @buffer_capacity && @dispatcher_thread.alive? @condition.wait(0.01) end end duration = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - before StatsD.logger.warn do "[#{self.class.name}] thread##{Thread.current.object_id} resumed after #{duration.round(2)}ms" end end else flush end self end |
#shutdown(wait = @flush_interval * 2) ⇒ Object
128 129 130 131 132 133 134 135 |
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 128 def shutdown(wait = @flush_interval * 2) @interrupted = true if @dispatcher_thread&.alive? @dispatcher_thread.join(wait) else flush end end |