Class: WaterDrop::Polling::QueuePipe

Inherits:
Object
  • Object
show all
Defined in:
lib/waterdrop/polling/queue_pipe.rb

Overview

A pipe connected to librdkafka’s queue event notification system When events (delivery reports, statistics) arrive, librdkafka writes to the pipe allowing IO.select to wake up immediately

This pipe is also used by WaterDrop to signal:

  • Continue: when poll hits time limit but more events remain

  • Close: when producer is being closed (combined with @closing flag in State)

Reusing the same pipe reduces file descriptors and IO.select monitoring overhead

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ QueuePipe

Creates a new queue pipe and connects it to the client’s event queue

Parameters:

  • client (Rdkafka::Producer)

    the rdkafka client

Raises:

  • (StandardError)

    if enable_queue_io_events fails



21
22
23
24
25
26
# File 'lib/waterdrop/polling/queue_pipe.rb', line 21

def initialize(client)
  @reader, @writer = IO.pipe

  # Tell librdkafka to write to our pipe when events arrive on the main queue
  client.enable_queue_io_events(@writer.fileno)
end

Instance Attribute Details

#readerIO (readonly)

Returns the readable end of the pipe for use with IO.select.

Returns:

  • (IO)

    the readable end of the pipe for use with IO.select



16
17
18
# File 'lib/waterdrop/polling/queue_pipe.rb', line 16

def reader
  @reader
end

Instance Method Details

#closeObject

Closes both ends of the pipe



47
48
49
50
# File 'lib/waterdrop/polling/queue_pipe.rb', line 47

def close
  close_io(@reader)
  close_io(@writer)
end

#drainObject

Drains all pending bytes from the pipe Called after IO.select returns to clear the notification Uses a single large read to drain in one syscall (pipe buffers are typically 64KB)



40
41
42
43
44
# File 'lib/waterdrop/polling/queue_pipe.rb', line 40

def drain
  @reader.read_nonblock(1_048_576, exception: false)
rescue IOError, Errno::EBADF
  # Pipe closed during drain
end

#signalObject

Signals by writing a byte to the pipe Used to wake IO.select for continue/close signals Thread-safe and non-blocking; silently ignores errors



31
32
33
34
35
# File 'lib/waterdrop/polling/queue_pipe.rb', line 31

def signal
  @writer.write_nonblock("W", exception: false)
rescue IOError, Errno::EBADF
  # Pipe closed
end