Class: WaterDrop::Polling::State
- Inherits:
-
Object
- Object
- WaterDrop::Polling::State
- Includes:
- Karafka::Core::Helpers::Time
- Defined in:
- lib/waterdrop/polling/state.rb
Overview
Holds the state for a registered producer in the poller Each producer has its own State instance that tracks:
-
The producer ID and client reference
-
Queue pipe for IO.select monitoring (shared with librdkafka for efficiency)
-
Configuration (max poll time)
-
Last poll time for staleness detection
-
Closing flag for shutdown signaling
Instance Attribute Summary collapse
-
#io ⇒ IO
readonly
The queue pipe reader for IO.select monitoring.
-
#monitor ⇒ Object
readonly
The producer’s monitor for instrumentation.
-
#producer_id ⇒ String
readonly
Producer ID.
Instance Method Summary collapse
-
#close ⇒ Object
Closes all resources and signals any waiters.
-
#closed? ⇒ Boolean
Whether this state has been closed.
-
#closing? ⇒ Boolean
Whether this producer is being closed.
-
#drain ⇒ Object
Drains the queue pipe Called before polling to clear any pending signals.
-
#initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval) ⇒ State
constructor
Creates a new state for a producer.
-
#mark_polled! ⇒ Object
Marks this producer as having been polled Called after polling to track staleness.
-
#needs_periodic_poll? ⇒ Boolean
Checks if this producer needs a periodic poll Used to ensure OAuth/stats callbacks fire even when another producer is busy Includes internal throttling to avoid excessive checks.
-
#poll ⇒ Boolean
Polls the producer’s event queue.
-
#queue_empty? ⇒ Boolean
Checks if the producer’s event queue is empty.
-
#signal_close ⇒ Object
Signals the poller to remove this producer Called from any thread when the producer is being closed Sets closing flag BEFORE signaling to ensure poller sees it.
-
#signal_continue ⇒ Object
Signals that there’s more work to do (hit time limit but queue not empty) This wakes up IO.select immediately instead of waiting for timeout.
-
#wait_for_close ⇒ Object
Waits for this state to be closed Used by unregister to ensure synchronous cleanup before returning This matches the threaded polling behavior which drains without timeout.
Constructor Details
#initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval) ⇒ State
Creates a new state for a producer
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/waterdrop/polling/state.rb', line 31 def initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval) @producer_id = producer_id @client = client @monitor = monitor @max_poll_time = max_poll_time @periodic_poll_interval = periodic_poll_interval # Initialize to 0 so first check always triggers (no nil handling needed) @last_poll_time = 0 @last_stale_check = 0 @last_stale_result = false # Closing flag - set by signal_close, checked by poller @closing = false # Latch for synchronizing close operations @close_latch = Latch.new # Queue pipe for all signaling (librdkafka events + continue + close) # Reusing one pipe reduces FDs and IO.select overhead @queue_pipe = QueuePipe.new(@client) # Cache reader reference for hot path performance @io = @queue_pipe.reader end |
Instance Attribute Details
#io ⇒ IO (readonly)
Returns the queue pipe reader for IO.select monitoring.
19 20 21 |
# File 'lib/waterdrop/polling/state.rb', line 19 def io @io end |
#monitor ⇒ Object (readonly)
Returns the producer’s monitor for instrumentation.
22 23 24 |
# File 'lib/waterdrop/polling/state.rb', line 22 def monitor @monitor end |
#producer_id ⇒ String (readonly)
Returns producer ID.
16 17 18 |
# File 'lib/waterdrop/polling/state.rb', line 16 def producer_id @producer_id end |
Instance Method Details
#close ⇒ Object
Closes all resources and signals any waiters
131 132 133 134 135 136 |
# File 'lib/waterdrop/polling/state.rb', line 131 def close return if closed? @queue_pipe.close @close_latch.release! end |
#closed? ⇒ Boolean
Returns whether this state has been closed.
146 147 148 |
# File 'lib/waterdrop/polling/state.rb', line 146 def closed? @close_latch.released? end |
#closing? ⇒ Boolean
Returns whether this producer is being closed.
126 127 128 |
# File 'lib/waterdrop/polling/state.rb', line 126 def closing? @closing end |
#drain ⇒ Object
Drains the queue pipe Called before polling to clear any pending signals
58 59 60 |
# File 'lib/waterdrop/polling/state.rb', line 58 def drain @queue_pipe.drain end |
#mark_polled! ⇒ Object
Marks this producer as having been polled Called after polling to track staleness
93 94 95 |
# File 'lib/waterdrop/polling/state.rb', line 93 def mark_polled! @last_poll_time = monotonic_now end |
#needs_periodic_poll? ⇒ Boolean
Checks if this producer needs a periodic poll Used to ensure OAuth/stats callbacks fire even when another producer is busy Includes internal throttling to avoid excessive checks
101 102 103 104 105 106 107 108 109 |
# File 'lib/waterdrop/polling/state.rb', line 101 def needs_periodic_poll? now = monotonic_now # Throttle: return cached result if checked recently return @last_stale_result if (now - @last_stale_check) < STALE_CHECK_THROTTLE_MS @last_stale_check = now @last_stale_result = (now - @last_poll_time) >= @periodic_poll_interval end |
#poll ⇒ Boolean
Polls the producer’s event queue
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/waterdrop/polling/state.rb', line 64 def poll drained = true deadline = monotonic_now + @max_poll_time @client.events_poll_nb_each do |count| if count.zero? :stop elsif monotonic_now >= deadline drained = false :stop end end drained end |
#queue_empty? ⇒ Boolean
Checks if the producer’s event queue is empty
82 83 84 |
# File 'lib/waterdrop/polling/state.rb', line 82 def queue_empty? @client.queue_size.zero? end |
#signal_close ⇒ Object
Signals the poller to remove this producer Called from any thread when the producer is being closed Sets closing flag BEFORE signaling to ensure poller sees it
114 115 116 117 |
# File 'lib/waterdrop/polling/state.rb', line 114 def signal_close @closing = true @queue_pipe.signal end |
#signal_continue ⇒ Object
Signals that there’s more work to do (hit time limit but queue not empty) This wakes up IO.select immediately instead of waiting for timeout
121 122 123 |
# File 'lib/waterdrop/polling/state.rb', line 121 def signal_continue @queue_pipe.signal end |
#wait_for_close ⇒ Object
Waits for this state to be closed Used by unregister to ensure synchronous cleanup before returning This matches the threaded polling behavior which drains without timeout
141 142 143 |
# File 'lib/waterdrop/polling/state.rb', line 141 def wait_for_close @close_latch.wait end |