Class: WaterDrop::Polling::State

Inherits:
Object
  • Object
show all
Includes:
Karafka::Core::Helpers::Time
Defined in:
lib/waterdrop/polling/state.rb

Overview

Holds the state for a registered producer in the poller Each producer has its own State instance that tracks:

  • The producer ID and client reference

  • Queue pipe for IO.select monitoring (shared with librdkafka for efficiency)

  • Configuration (max poll time)

  • Last poll time for staleness detection

  • Closing flag for shutdown signaling

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval) ⇒ State

Creates a new state for a producer

Parameters:

  • producer_id (String)

    unique producer ID

  • client (Rdkafka::Producer)

    the rdkafka producer client

  • monitor (Object)

    the producer’s monitor for error reporting

  • max_poll_time (Integer)

    max time in ms to poll per cycle

  • periodic_poll_interval (Integer)

    max time in ms before this producer needs periodic poll

Raises:

  • (StandardError)

    if queue pipe setup fails (FD mode requires this to work)



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/waterdrop/polling/state.rb', line 31

def initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval)
  @producer_id = producer_id
  @client = client
  @monitor = monitor
  @max_poll_time = max_poll_time
  @periodic_poll_interval = periodic_poll_interval
  # Initialize to 0 so first check always triggers (no nil handling needed)
  @last_poll_time = 0
  @last_stale_check = 0
  @last_stale_result = false

  # Closing flag - set by signal_close, checked by poller
  @closing = false

  # Latch for synchronizing close operations
  @close_latch = Latch.new

  # Queue pipe for all signaling (librdkafka events + continue + close)
  # Reusing one pipe reduces FDs and IO.select overhead
  @queue_pipe = QueuePipe.new(@client)

  # Cache reader reference for hot path performance
  @io = @queue_pipe.reader
end

Instance Attribute Details

#ioIO (readonly)

Returns the queue pipe reader for IO.select monitoring.

Returns:

  • (IO)

    the queue pipe reader for IO.select monitoring



19
20
21
# File 'lib/waterdrop/polling/state.rb', line 19

def io
  @io
end

#monitorObject (readonly)

Returns the producer’s monitor for instrumentation.

Returns:

  • (Object)

    the producer’s monitor for instrumentation



22
23
24
# File 'lib/waterdrop/polling/state.rb', line 22

def monitor
  @monitor
end

#producer_idString (readonly)

Returns producer ID.

Returns:

  • (String)

    producer ID



16
17
18
# File 'lib/waterdrop/polling/state.rb', line 16

def producer_id
  @producer_id
end

Instance Method Details

#closeObject

Closes all resources and signals any waiters



131
132
133
134
135
136
# File 'lib/waterdrop/polling/state.rb', line 131

def close
  return if closed?

  @queue_pipe.close
  @close_latch.release!
end

#closed?Boolean

Returns whether this state has been closed.

Returns:

  • (Boolean)

    whether this state has been closed



146
147
148
# File 'lib/waterdrop/polling/state.rb', line 146

def closed?
  @close_latch.released?
end

#closing?Boolean

Returns whether this producer is being closed.

Returns:

  • (Boolean)

    whether this producer is being closed



126
127
128
# File 'lib/waterdrop/polling/state.rb', line 126

def closing?
  @closing
end

#drainObject

Drains the queue pipe Called before polling to clear any pending signals



58
59
60
# File 'lib/waterdrop/polling/state.rb', line 58

def drain
  @queue_pipe.drain
end

#mark_polled!Object

Marks this producer as having been polled Called after polling to track staleness



93
94
95
# File 'lib/waterdrop/polling/state.rb', line 93

def mark_polled!
  @last_poll_time = monotonic_now
end

#needs_periodic_poll?Boolean

Checks if this producer needs a periodic poll Used to ensure OAuth/stats callbacks fire even when another producer is busy Includes internal throttling to avoid excessive checks

Returns:

  • (Boolean)

    true if the producer needs a periodic poll



101
102
103
104
105
106
107
108
109
# File 'lib/waterdrop/polling/state.rb', line 101

def needs_periodic_poll?
  now = monotonic_now

  # Throttle: return cached result if checked recently
  return @last_stale_result if (now - @last_stale_check) < STALE_CHECK_THROTTLE_MS

  @last_stale_check = now
  @last_stale_result = (now - @last_poll_time) >= @periodic_poll_interval
end

#pollBoolean

Polls the producer’s event queue

Returns:

  • (Boolean)

    true if no more events to process, false if stopped due to time limit



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/waterdrop/polling/state.rb', line 64

def poll
  drained = true
  deadline = monotonic_now + @max_poll_time

  @client.events_poll_nb_each do |count|
    if count.zero?
      :stop
    elsif monotonic_now >= deadline
      drained = false
      :stop
    end
  end

  drained
end

#queue_empty?Boolean

Checks if the producer’s event queue is empty

Returns:

  • (Boolean)

    true if queue is empty



82
83
84
# File 'lib/waterdrop/polling/state.rb', line 82

def queue_empty?
  @client.queue_size.zero?
end

#signal_closeObject

Signals the poller to remove this producer Called from any thread when the producer is being closed Sets closing flag BEFORE signaling to ensure poller sees it



114
115
116
117
# File 'lib/waterdrop/polling/state.rb', line 114

def signal_close
  @closing = true
  @queue_pipe.signal
end

#signal_continueObject

Signals that there’s more work to do (hit time limit but queue not empty) This wakes up IO.select immediately instead of waiting for timeout



121
122
123
# File 'lib/waterdrop/polling/state.rb', line 121

def signal_continue
  @queue_pipe.signal
end

#wait_for_closeObject

Waits for this state to be closed Used by unregister to ensure synchronous cleanup before returning This matches the threaded polling behavior which drains without timeout



141
142
143
# File 'lib/waterdrop/polling/state.rb', line 141

def wait_for_close
  @close_latch.wait
end