Class: Rdkafka::Producer::Client
- Inherits:
-
Object
- Object
- Rdkafka::Producer::Client
- Defined in:
- lib/rdkafka/producer/client.rb
Instance Method Summary collapse
- #close(object_id = nil) ⇒ Object
- #closed? ⇒ Boolean
- #finalizer ⇒ Object
-
#initialize(native) ⇒ Client
constructor
A new instance of Client.
- #native ⇒ Object
Constructor Details
#initialize(native) ⇒ Client
Returns a new instance of Client.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/rdkafka/producer/client.rb', line 4 def initialize(native) @native = native # Start thread to poll client for delivery callbacks @polling_thread = Thread.new do loop do Rdkafka::Bindings.rd_kafka_poll(native, 250) # Exit thread if closing and the poll queue is empty if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(native) == 0 break end end end @polling_thread.abort_on_exception = true @polling_thread[:closing] = false end |
Instance Method Details
#close(object_id = nil) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/rdkafka/producer/client.rb', line 33 def close(object_id=nil) return unless @native # Indicate to polling thread that we're closing @polling_thread[:closing] = true # Wait for the polling thread to finish up @polling_thread.join Rdkafka::Bindings.rd_kafka_destroy(@native) @native = nil end |
#closed? ⇒ Boolean
29 30 31 |
# File 'lib/rdkafka/producer/client.rb', line 29 def closed? @native.nil? end |
#finalizer ⇒ Object
25 26 27 |
# File 'lib/rdkafka/producer/client.rb', line 25 def finalizer ->(_) { close } end |
#native ⇒ Object
21 22 23 |
# File 'lib/rdkafka/producer/client.rb', line 21 def native @native end |