Class: WaterDrop::Polling::Poller

Inherits:
Object
  • Object
show all
Includes:
Karafka::Core::Helpers::Time, Singleton
Defined in:
lib/waterdrop/polling/poller.rb

Overview

Note:

Newly registered producers may experience up to 1 second delay before their first poll cycle, as the poller thread only rebuilds its IO list when IO.select times out. This is acceptable because producers are expected to be long-lived and the initial connection overhead to Kafka typically exceeds this delay anyway.

Global poller singleton that manages a single polling thread for all FD-mode producers This replaces librdkafka’s native background polling threads with a single Ruby thread that uses IO.select for efficient multiplexing

Spawning one thread per producer is acceptable for 1-2 producers but in case of a system with several (transactional for example) the cost becomes bigger and bigger.

This implementation handles things by being event-driven instead of GVL releasing blocking.

Constant Summary collapse

ID_MUTEX =

Mutex for thread-safe ID generation - initialized at class load time to avoid race conditions with lazy initialization

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePoller

Returns a new instance of Poller.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/waterdrop/polling/poller.rb', line 50

def initialize
  @id = self.class.next_id
  @mutex = Mutex.new
  @producers = {}
  @thread = nil
  @shutdown = false
  @pid = Process.pid

  # Cached collections - rebuilt only when producers change
  @cached_ios = []
  @cached_io_to_state = {}
  @cached_states = []
  @cached_result = nil
  @ios_dirty = true
end

Instance Attribute Details

#idInteger (readonly)

Returns unique identifier for this poller instance.

Returns:

  • (Integer)

    unique identifier for this poller instance



48
49
50
# File 'lib/waterdrop/polling/poller.rb', line 48

def id
  @id
end

Class Method Details

.next_idInteger

Generates incremental IDs for poller instances (starting from 0)

Returns:

  • (Integer)

    next poller ID



38
39
40
41
42
43
44
# File 'lib/waterdrop/polling/poller.rb', line 38

def next_id
  ID_MUTEX.synchronize do
    id = @id_counter
    @id_counter += 1
    id
  end
end

Instance Method Details

#alive?Boolean

Checks if the poller thread is alive

Returns:

  • (Boolean)

    true if the poller thread is running



75
76
77
# File 'lib/waterdrop/polling/poller.rb', line 75

def alive?
  @thread&.alive? || false
end

#countInteger

Returns the number of registered producers

Returns:

  • (Integer)

    number of producers



81
82
83
# File 'lib/waterdrop/polling/poller.rb', line 81

def count
  @mutex.synchronize { @producers.size }
end

#in_poller_thread?Boolean

Checks if the current thread is the poller thread Used to detect when close is called from within a callback to avoid deadlock

Returns:

  • (Boolean)

    true if current thread is the poller thread



69
70
71
# File 'lib/waterdrop/polling/poller.rb', line 69

def in_poller_thread?
  Thread.current == @thread
end

#register(producer, client) ⇒ Object

Registers a producer with the poller

Parameters:

  • producer (WaterDrop::Producer)

    the producer instance

  • client (Rdkafka::Producer)

    the rdkafka client



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/waterdrop/polling/poller.rb', line 113

def register(producer, client)
  ensure_same_process!

  state = State.new(
    producer.id,
    client,
    producer.monitor,
    producer.config.polling.fd.max_time,
    producer.config.polling.fd.periodic_poll_interval
  )

  @mutex.synchronize do
    @producers[producer.id] = state
    @ios_dirty = true
    # Reset shutdown flag in case thread is exiting but hasn't yet
    # This prevents race where new producer is closed by exiting thread
    @shutdown = false
    ensure_thread_running!
  end

  producer.monitor.instrument(
    "poller.producer_registered",
    producer_id: producer.id
  )
end

#shutdown!Object

Note:

This is primarily for testing to reset singleton state between tests

Shuts down the poller and resets state



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/waterdrop/polling/poller.rb', line 87

def shutdown!
  @mutex.synchronize { @shutdown = true }

  thread = @thread
  if thread&.alive?
    thread.join(5)
    thread.kill if thread.alive?
  end

  @mutex.synchronize do
    @producers.each_value { |state| state.close unless state.closed? }
    @producers.clear
    @thread = nil
    @shutdown = false
    @ios_dirty = true
    @cached_ios = []
    @cached_io_to_state = {}
    @cached_states = []
    @cached_result = nil
    @poll_timeout_s = nil
  end
end

#unregister(producer) ⇒ Object

Unregisters a producer from the poller This method blocks until the producer is fully removed from the poller to prevent race conditions when disconnect/reconnect happens in quick succession This matches the threaded polling behavior which drains without timeout

Parameters:



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/waterdrop/polling/poller.rb', line 144

def unregister(producer)
  state, thread = @mutex.synchronize { [@producers[producer.id], @thread] }

  return unless state

  # Signal the poller thread to handle removal
  state.signal_close

  # Wait for the state to be fully closed by the poller thread
  # This prevents race conditions where a new registration with the same
  # producer_id could be deleted by a pending close signal
  # Skip waiting if called from within the poller thread itself (e.g., from a callback)
  # to avoid deadlock - the poller thread can't wait for itself
  # The cleanup will happen after the callback returns
  state.wait_for_close unless Thread.current == thread

  producer.monitor.instrument(
    "poller.producer_unregistered",
    producer_id: producer.id
  )
end