Class: Kafka::AsyncProducer::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/async_producer.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue:, producer:, delivery_threshold:) ⇒ Worker

Returns a new instance of Worker.



173
174
175
176
177
# File 'lib/kafka/async_producer.rb', line 173

def initialize(queue:, producer:, delivery_threshold:)
  @queue = queue
  @producer = producer
  @delivery_threshold = delivery_threshold
end

Instance Method Details

#runObject



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/kafka/async_producer.rb', line 179

def run
  loop do
    operation, payload = @queue.pop

    case operation
    when :produce
      produce(*payload)
      deliver_messages if threshold_reached?
    when :deliver_messages
      deliver_messages
    when :shutdown
      # Deliver any pending messages first.
      deliver_messages

      # Stop the run loop.
      break
    else
      raise "Unknown operation #{operation.inspect}"
    end
  end
ensure
  @producer.shutdown
end