Class: Rdkafka::Producer::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/producer/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(native) ⇒ Client

Returns a new instance of Client.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/rdkafka/producer/client.rb', line 4

def initialize(native)
  @native = native

  # Start thread to poll client for delivery callbacks
  @polling_thread = Thread.new do
    loop do
      Rdkafka::Bindings.rd_kafka_poll(native, 250)
      # Exit thread if closing and the poll queue is empty
      if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(native) == 0
        break
      end
    end
  end
  @polling_thread.abort_on_exception = true
  @polling_thread[:closing] = false
end

Instance Method Details

#close(object_id = nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rdkafka/producer/client.rb', line 33

def close(object_id=nil)
  return unless @native

  # Indicate to polling thread that we're closing
  @polling_thread[:closing] = true
  # Wait for the polling thread to finish up
  @polling_thread.join

  Rdkafka::Bindings.rd_kafka_destroy(@native)

  @native = nil
end

#closed?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/rdkafka/producer/client.rb', line 29

def closed?
  @native.nil?
end

#finalizerObject



25
26
27
# File 'lib/rdkafka/producer/client.rb', line 25

def finalizer
  ->(_) { close }
end

#nativeObject



21
22
23
# File 'lib/rdkafka/producer/client.rb', line 21

def native
  @native
end