Class: WaterDrop::Polling::Latch

Inherits:
Object
  • Object
show all
Defined in:
lib/waterdrop/polling/latch.rb

Overview

A thread-safe latch for synchronizing producer close operations

When a producer is closed, two threads are involved:

  1. The caller thread (user code calling producer.close)

  2. The poller thread (background thread running IO.select)

The close sequence:

  1. Caller calls producer.close -> unregister_from_poller -> Poller#unregister

  2. Poller#unregister signals via control pipe and calls state.wait_for_close (blocks on latch)

  3. Poller thread receives control signal, drains queue, calls state.close

  4. state.close releases the latch via release!

  5. Caller’s wait_for_close returns, unregister completes

This ensures the producer is fully drained and removed from the poller before returning control to the caller, preventing race conditions.

Instance Method Summary collapse

Constructor Details

#initializeLatch



21
22
23
24
25
# File 'lib/waterdrop/polling/latch.rb', line 21

def initialize
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @released = false
end

Instance Method Details

#release!Object

Releases the latch and wakes any waiting threads



28
29
30
31
32
33
# File 'lib/waterdrop/polling/latch.rb', line 28

def release!
  @mutex.synchronize do
    @released = true
    @cv.broadcast
  end
end

#released?Boolean



44
45
46
# File 'lib/waterdrop/polling/latch.rb', line 44

def released?
  @released
end

#waitObject

Waits until the latch is released Returns immediately if already released



37
38
39
40
41
# File 'lib/waterdrop/polling/latch.rb', line 37

def wait
  @mutex.synchronize do
    @cv.wait(@mutex) until @released
  end
end