Class: Kafka::AsyncProducer::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/async_producer.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue:, producer:, delivery_threshold:) ⇒ Worker

Returns a new instance of Worker.


151
152
153
154
155
# File 'lib/kafka/async_producer.rb', line 151

def initialize(queue:, producer:, delivery_threshold:)
  @queue = queue
  @producer = producer
  @delivery_threshold = delivery_threshold
end

Instance Method Details

#runObject


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/kafka/async_producer.rb', line 157

def run
  loop do
    operation, payload = @queue.pop

    case operation
    when :produce
      produce(*payload)
      deliver_messages if threshold_reached?
    when :deliver_messages
      deliver_messages
    when :shutdown      # Deliver any pending messages first.

      deliver_messages

      # Stop the run loop.
      break
    else
      raise "Unknown operation #{operation.inspect}"
    end
  end
ensure
  @producer.shutdown
end