Class: StatsD::Instrument::BatchedUDPSink::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/instrument/batched_udp_sink.rb

Instance Method Summary collapse

Constructor Details

#initialize(host, port, buffer_capacity, thread_priority, max_packet_size) ⇒ Dispatcher

Returns a new instance of Dispatcher.



81
82
83
84
85
86
87
88
89
90
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 81

def initialize(host, port, buffer_capacity, thread_priority, max_packet_size)
  @udp_sink = UDPSink.new(host, port)
  @interrupted = false
  @thread_priority = thread_priority
  @max_packet_size = max_packet_size
  @buffer_capacity = buffer_capacity
  @buffer = Buffer.new(buffer_capacity)
  @dispatcher_thread = Thread.new { dispatch }
  @pid = Process.pid
end

Instance Method Details

#<<(datagram) ⇒ Object



92
93
94
95
96
97
98
99
100
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 92

def <<(datagram)
  if !thread_healthcheck || !@buffer.push_nonblock(datagram)
    # The buffer is full or the thread can't be respaned,
    # we'll send the datagram synchronously
    @udp_sink << datagram
  end

  self
end

#flush(blocking:) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 111

def flush(blocking:)
  packet = "".b
  next_datagram = nil
  until @buffer.closed? && @buffer.empty? && next_datagram.nil?
    if blocking
      next_datagram ||= @buffer.pop
      break if next_datagram.nil? # queue was closed
    else
      next_datagram ||= @buffer.pop_nonblock
      break if next_datagram.nil? # no datagram in buffer
    end

    packet << next_datagram
    next_datagram = nil
    if packet.bytesize <= @max_packet_size
      while (next_datagram = @buffer.pop_nonblock)
        if @max_packet_size - packet.bytesize - 1 > next_datagram.bytesize
          packet << NEWLINE << next_datagram
        else
          break
        end
      end
    end

    @udp_sink << packet
    packet.clear
  end
end

#shutdown(wait = 2) ⇒ Object



102
103
104
105
106
107
108
109
# File 'lib/statsd/instrument/batched_udp_sink.rb', line 102

def shutdown(wait = 2)
  @interrupted = true
  @buffer.close
  if @dispatcher_thread&.alive?
    @dispatcher_thread.join(wait)
  end
  flush(blocking: false)
end