Class: WaterDrop::Polling::QueuePipe
- Inherits:
-
Object
- Object
- WaterDrop::Polling::QueuePipe
- Defined in:
- lib/waterdrop/polling/queue_pipe.rb
Overview
A pipe connected to librdkafka’s queue event notification system When events (delivery reports, statistics) arrive, librdkafka writes to the pipe allowing IO.select to wake up immediately
This pipe is also used by WaterDrop to signal:
-
Continue: when poll hits time limit but more events remain
-
Close: when producer is being closed (combined with @closing flag in State)
Reusing the same pipe reduces file descriptors and IO.select monitoring overhead
Instance Attribute Summary collapse
-
#reader ⇒ IO
readonly
The readable end of the pipe for use with IO.select.
Instance Method Summary collapse
-
#close ⇒ Object
Closes both ends of the pipe.
-
#drain ⇒ Object
Drains all pending bytes from the pipe Called after IO.select returns to clear the notification Uses a single large read to drain in one syscall (pipe buffers are typically 64KB).
-
#initialize(client) ⇒ QueuePipe
constructor
Creates a new queue pipe and connects it to the client’s event queue.
-
#signal ⇒ Object
Signals by writing a byte to the pipe Used to wake IO.select for continue/close signals Thread-safe and non-blocking; silently ignores errors.
Constructor Details
#initialize(client) ⇒ QueuePipe
Creates a new queue pipe and connects it to the client’s event queue
21 22 23 24 25 26 |
# File 'lib/waterdrop/polling/queue_pipe.rb', line 21 def initialize(client) @reader, @writer = IO.pipe # Tell librdkafka to write to our pipe when events arrive on the main queue client.enable_queue_io_events(@writer.fileno) end |
Instance Attribute Details
#reader ⇒ IO (readonly)
Returns the readable end of the pipe for use with IO.select.
16 17 18 |
# File 'lib/waterdrop/polling/queue_pipe.rb', line 16 def reader @reader end |
Instance Method Details
#close ⇒ Object
Closes both ends of the pipe
47 48 49 50 |
# File 'lib/waterdrop/polling/queue_pipe.rb', line 47 def close close_io(@reader) close_io(@writer) end |
#drain ⇒ Object
Drains all pending bytes from the pipe Called after IO.select returns to clear the notification Uses a single large read to drain in one syscall (pipe buffers are typically 64KB)
40 41 42 43 44 |
# File 'lib/waterdrop/polling/queue_pipe.rb', line 40 def drain @reader.read_nonblock(1_048_576, exception: false) rescue IOError, Errno::EBADF # Pipe closed during drain end |
#signal ⇒ Object
Signals by writing a byte to the pipe Used to wake IO.select for continue/close signals Thread-safe and non-blocking; silently ignores errors
31 32 33 34 35 |
# File 'lib/waterdrop/polling/queue_pipe.rb', line 31 def signal @writer.write_nonblock("W", exception: false) rescue IOError, Errno::EBADF # Pipe closed end |