Class: StatsD::Instrument::BatchedUDPSink::DispatcherStats

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/instrument/batched_udp_sink.rb

Instance Method Summary collapse

Constructor Details

#initialize(interval) ⇒ DispatcherStats

Returns a new instance of DispatcherStats.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 84

def initialize(interval)
  # The number of times the batched udp sender needed to
  # send a statsd line synchronously, due to the buffer
  # being full.
  @synchronous_sends = 0
  # The number of times we send a batch of statsd lines,
  # of any size.
  @batched_sends = 0
  # The average buffer length, measured at the beginning of
  # each batch.
  @avg_buffer_length = 0
  # The average per-batch byte size of the packet sent to
  # the underlying UDPSink.
  @avg_batched_packet_size = 0
  # The average number of statsd lines per batch.
  @avg_batch_length = 0

  @mutex = Mutex.new

  @interval = interval
  @since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

Instance Method Details

#increment_batched_sends(buffer_len, packet_size, batch_len) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 135

def increment_batched_sends(buffer_len, packet_size, batch_len)
  @mutex.synchronize do
    @batched_sends += 1
    @avg_buffer_length += (buffer_len - @avg_buffer_length) / @batched_sends
    @avg_batched_packet_size += (packet_size - @avg_batched_packet_size) / @batched_sends
    @avg_batch_length += (batch_len - @avg_batch_length) / @batched_sends
  end
end

#increment_synchronous_sendsObject



131
132
133
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 131

def increment_synchronous_sends
  @mutex.synchronize { @synchronous_sends += 1 }
end

#maybe_flush!(force: false) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 107

def maybe_flush!(force: false)
  return if !force && Process.clock_gettime(Process::CLOCK_MONOTONIC) - @since < @interval

  synchronous_sends = 0
  batched_sends = 0
  avg_buffer_length = 0
  avg_batched_packet_size = 0
  avg_batch_length = 0
  @mutex.synchronize do
    synchronous_sends, @synchronous_sends = @synchronous_sends, synchronous_sends
    batched_sends, @batched_sends = @batched_sends, batched_sends
    avg_buffer_length, @avg_buffer_length = @avg_buffer_length, avg_buffer_length
    avg_batched_packet_size, @avg_batched_packet_size = @avg_batched_packet_size, avg_batched_packet_size
    avg_batch_length, @avg_batch_length = @avg_batch_length, avg_batch_length
    @since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end

  StatsD.increment("statsd_instrument.batched_udp_sink.synchronous_sends", synchronous_sends)
  StatsD.increment("statsd_instrument.batched_udp_sink.batched_sends", batched_sends)
  StatsD.gauge("statsd_instrument.batched_udp_sink.avg_buffer_length", avg_buffer_length)
  StatsD.gauge("statsd_instrument.batched_udp_sink.avg_batched_packet_size", avg_batched_packet_size)
  StatsD.gauge("statsd_instrument.batched_udp_sink.avg_batch_length", avg_batch_length)
end