Class: WaterDrop::Polling::Poller
- Inherits:
-
Object
- Object
- WaterDrop::Polling::Poller
- Includes:
- Karafka::Core::Helpers::Time, Singleton
- Defined in:
- lib/waterdrop/polling/poller.rb
Overview
Newly registered producers may experience up to 1 second delay before their first poll cycle, as the poller thread only rebuilds its IO list when IO.select times out. This is acceptable because producers are expected to be long-lived and the initial connection overhead to Kafka typically exceeds this delay anyway.
Global poller singleton that manages a single polling thread for all FD-mode producers This replaces librdkafka’s native background polling threads with a single Ruby thread that uses IO.select for efficient multiplexing
Spawning one thread per producer is acceptable for 1-2 producers but in case of a system with several (transactional for example) the cost becomes bigger and bigger.
This implementation handles things by being event-driven instead of GVL releasing blocking.
Constant Summary collapse
- ID_MUTEX =
Mutex for thread-safe ID generation - initialized at class load time to avoid race conditions with lazy initialization
Mutex.new
Instance Attribute Summary collapse
-
#id ⇒ Integer
readonly
Unique identifier for this poller instance.
Class Method Summary collapse
-
.next_id ⇒ Integer
Generates incremental IDs for poller instances (starting from 0).
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Checks if the poller thread is alive.
-
#count ⇒ Integer
Returns the number of registered producers.
-
#in_poller_thread? ⇒ Boolean
Checks if the current thread is the poller thread Used to detect when close is called from within a callback to avoid deadlock.
-
#initialize ⇒ Poller
constructor
A new instance of Poller.
-
#register(producer, client) ⇒ Object
Registers a producer with the poller.
-
#shutdown! ⇒ Object
Shuts down the poller and resets state.
-
#unregister(producer) ⇒ Object
Unregisters a producer from the poller This method blocks until the producer is fully removed from the poller to prevent race conditions when disconnect/reconnect happens in quick succession This matches the threaded polling behavior which drains without timeout.
Constructor Details
#initialize ⇒ Poller
Returns a new instance of Poller.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/waterdrop/polling/poller.rb', line 50 def initialize @id = self.class.next_id @mutex = Mutex.new @producers = {} @thread = nil @shutdown = false @pid = Process.pid # Cached collections - rebuilt only when producers change @cached_ios = [] @cached_io_to_state = {} @cached_states = [] @cached_result = nil @ios_dirty = true end |
Instance Attribute Details
#id ⇒ Integer (readonly)
Returns unique identifier for this poller instance.
48 49 50 |
# File 'lib/waterdrop/polling/poller.rb', line 48 def id @id end |
Class Method Details
.next_id ⇒ Integer
Generates incremental IDs for poller instances (starting from 0)
38 39 40 41 42 43 44 |
# File 'lib/waterdrop/polling/poller.rb', line 38 def next_id ID_MUTEX.synchronize do id = @id_counter @id_counter += 1 id end end |
Instance Method Details
#alive? ⇒ Boolean
Checks if the poller thread is alive
75 76 77 |
# File 'lib/waterdrop/polling/poller.rb', line 75 def alive? @thread&.alive? || false end |
#count ⇒ Integer
Returns the number of registered producers
81 82 83 |
# File 'lib/waterdrop/polling/poller.rb', line 81 def count @mutex.synchronize { @producers.size } end |
#in_poller_thread? ⇒ Boolean
Checks if the current thread is the poller thread Used to detect when close is called from within a callback to avoid deadlock
69 70 71 |
# File 'lib/waterdrop/polling/poller.rb', line 69 def in_poller_thread? Thread.current == @thread end |
#register(producer, client) ⇒ Object
Registers a producer with the poller
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/waterdrop/polling/poller.rb', line 113 def register(producer, client) ensure_same_process! state = State.new( producer.id, client, producer.monitor, producer.config.polling.fd.max_time, producer.config.polling.fd.periodic_poll_interval ) @mutex.synchronize do @producers[producer.id] = state @ios_dirty = true # Reset shutdown flag in case thread is exiting but hasn't yet # This prevents race where new producer is closed by exiting thread @shutdown = false ensure_thread_running! end producer.monitor.instrument( "poller.producer_registered", producer_id: producer.id ) end |
#shutdown! ⇒ Object
This is primarily for testing to reset singleton state between tests
Shuts down the poller and resets state
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/waterdrop/polling/poller.rb', line 87 def shutdown! @mutex.synchronize { @shutdown = true } thread = @thread if thread&.alive? thread.join(5) thread.kill if thread.alive? end @mutex.synchronize do @producers.each_value { |state| state.close unless state.closed? } @producers.clear @thread = nil @shutdown = false @ios_dirty = true @cached_ios = [] @cached_io_to_state = {} @cached_states = [] @cached_result = nil @poll_timeout_s = nil end end |
#unregister(producer) ⇒ Object
Unregisters a producer from the poller This method blocks until the producer is fully removed from the poller to prevent race conditions when disconnect/reconnect happens in quick succession This matches the threaded polling behavior which drains without timeout
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/waterdrop/polling/poller.rb', line 144 def unregister(producer) state, thread = @mutex.synchronize { [@producers[producer.id], @thread] } return unless state # Signal the poller thread to handle removal state.signal_close # Wait for the state to be fully closed by the poller thread # This prevents race conditions where a new registration with the same # producer_id could be deleted by a pending close signal # Skip waiting if called from within the poller thread itself (e.g., from a callback) # to avoid deadlock - the poller thread can't wait for itself # The cleanup will happen after the callback returns state.wait_for_close unless Thread.current == thread producer.monitor.instrument( "poller.producer_unregistered", producer_id: producer.id ) end |