Class: Kafka::TopicProducer
- Inherits:
-
Object
- Object
- Kafka::TopicProducer
- Defined in:
- lib/fluent/plugin/kafka_producer_ext.rb
Instance Method Summary collapse
- #buffer_bytesize ⇒ Object
-
#buffer_size ⇒ Integer
Returns the number of messages currently held in the buffer.
-
#clear_buffer ⇒ nil
Deletes all buffered messages.
- #deliver_messages ⇒ Object
-
#initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) ⇒ TopicProducer
constructor
A new instance of TopicProducer.
- #produce(value, key, partition, partition_key) ⇒ Object
-
#shutdown ⇒ nil
Closes all connections to the brokers.
Constructor Details
#initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) ⇒ TopicProducer
Returns a new instance of TopicProducer.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 60 def initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster @logger = logger @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @topic = topic @cluster.add_target_topics(Set.new([topic])) # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end |
Instance Method Details
#buffer_bytesize ⇒ Object
114 115 116 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 114 def buffer_bytesize @pending_message_queue.bytesize + @buffer.bytesize end |
#buffer_size ⇒ Integer
Returns the number of messages currently held in the buffer.
110 111 112 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 110 def buffer_size @pending_message_queue.size + @buffer.size end |
#clear_buffer ⇒ nil
Deletes all buffered messages.
121 122 123 124 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 121 def clear_buffer @buffer.clear @pending_message_queue.clear end |
#deliver_messages ⇒ Object
100 101 102 103 104 105 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 100 def # There's no need to do anything if the buffer is empty. return if buffer_size == 0 end |
#produce(value, key, partition, partition_key) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 82 def produce(value, key, partition, partition_key) create_time = Time.now = PendingMessage.new( value, key, @topic, partition, partition_key, create_time, key.to_s.bytesize + value.to_s.bytesize ) @pending_message_queue.write() nil end |
#shutdown ⇒ nil
Closes all connections to the brokers.
129 130 131 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 129 def shutdown @cluster.disconnect end |