Class: Kafka::TopicProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/kafka_producer_ext.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) ⇒ TopicProducer

Returns a new instance of TopicProducer.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 59

def initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
  @cluster = cluster
  @logger = logger
  @instrumenter = instrumenter
  @required_acks = required_acks == :all ? -1 : required_acks
  @ack_timeout = ack_timeout
  @max_retries = max_retries
  @retry_backoff = retry_backoff
  @max_buffer_size = max_buffer_size
  @max_buffer_bytesize = max_buffer_bytesize
  @compressor = compressor

  @topic = topic
  @cluster.add_target_topics(Set.new([topic]))

  # A buffer organized by topic/partition.
  @buffer = MessageBuffer.new

  # Messages added by `#produce` but not yet assigned a partition.
  @pending_message_queue = PendingMessageQueue.new
end

Instance Method Details

#buffer_bytesizeObject



112
113
114
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 112

def buffer_bytesize
  @pending_message_queue.bytesize + @buffer.bytesize
end

#buffer_sizeInteger

Returns the number of messages currently held in the buffer.

Returns:

  • (Integer)

    buffer size.



108
109
110
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 108

def buffer_size
  @pending_message_queue.size + @buffer.size
end

#clear_buffernil

Deletes all buffered messages.

Returns:

  • (nil)


119
120
121
122
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 119

def clear_buffer
  @buffer.clear
  @pending_message_queue.clear
end

#deliver_messagesObject



98
99
100
101
102
103
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 98

def deliver_messages
  # There's no need to do anything if the buffer is empty.
  return if buffer_size == 0

  deliver_messages_with_retries
end

#produce(value, key, partition, partition_key) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 81

def produce(value, key, partition, partition_key)
  create_time = Time.now

  message = PendingMessage.new(
    value,
    key,
    @topic,
    partition,
    partition_key,
    create_time
  )

  @pending_message_queue.write(message)

  nil
end

#shutdownnil

Closes all connections to the brokers.

Returns:

  • (nil)


127
128
129
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 127

def shutdown
  @cluster.disconnect
end