Class: Racecar::Consumer
- Inherits:
-
Object
- Object
- Racecar::Consumer
- Defined in:
- lib/racecar/consumer.rb
Defined Under Namespace
Classes: Subscription
Class Attribute Summary collapse
-
.consumer ⇒ Object
Returns the value of attribute consumer.
-
.group_id ⇒ Object
Returns the value of attribute group_id.
-
.max_wait_time ⇒ Object
Returns the value of attribute max_wait_time.
-
.parallel_workers ⇒ Object
Returns the value of attribute parallel_workers.
-
.producer ⇒ Object
Returns the value of attribute producer.
Class Method Summary collapse
-
.subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {}) ⇒ nil
Adds one or more topic subscriptions.
- .subscriptions ⇒ Object
Instance Method Summary collapse
- #configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) ⇒ Object
-
#deliver! ⇒ Object
Blocks until all messages produced so far have been successfully published.
- #teardown ⇒ Object
Class Attribute Details
.consumer ⇒ Object
Returns the value of attribute consumer.
12 13 14 |
# File 'lib/racecar/consumer.rb', line 12 def consumer @consumer end |
.group_id ⇒ Object
Returns the value of attribute group_id.
11 12 13 |
# File 'lib/racecar/consumer.rb', line 11 def group_id @group_id end |
.max_wait_time ⇒ Object
Returns the value of attribute max_wait_time.
10 11 12 |
# File 'lib/racecar/consumer.rb', line 10 def max_wait_time @max_wait_time end |
.parallel_workers ⇒ Object
Returns the value of attribute parallel_workers.
12 13 14 |
# File 'lib/racecar/consumer.rb', line 12 def parallel_workers @parallel_workers end |
.producer ⇒ Object
Returns the value of attribute producer.
12 13 14 |
# File 'lib/racecar/consumer.rb', line 12 def producer @producer end |
Class Method Details
.subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {}) ⇒ nil
Adds one or more topic subscriptions.
Can be called multiple times in order to subscribe to more topics.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/racecar/consumer.rb', line 30 def subscribes_to( *topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {} ) topics.each do |topic| subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config) end end |
.subscriptions ⇒ Object
14 15 16 |
# File 'lib/racecar/consumer.rb', line 14 def subscriptions @subscriptions ||= [] end |
Instance Method Details
#configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) ⇒ Object
42 43 44 45 46 47 48 49 50 |
# File 'lib/racecar/consumer.rb', line 42 def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) @producer = producer @delivery_handles = [] @consumer = consumer @instrumenter = instrumenter @config = config end |
#deliver! ⇒ Object
Blocks until all messages produced so far have been successfully published. If message delivery finally fails, a Racecar::MessageDeliveryError is raised. The delivery failed for the reason in the exception. The error can be broker side (e.g. downtime, configuration issue) or specific to the message being sent. The caller must handle the latter cases or run into head of line blocking.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/racecar/consumer.rb', line 59 def deliver! @delivery_handles ||= [] if @delivery_handles.any? instrumentation_payload = { delivered_message_count: @delivery_handles.size } @instrumenter.instrument('deliver_messages', instrumentation_payload) do @delivery_handles.each do |handle| # rdkafka-ruby checks every wait_timeout seconds if the message was # successfully delivered, up to max_wait_timeout seconds before raising # Rdkafka::AbstractHandle::WaitTimeoutError. librdkafka will (re)try to # deliver all messages in the background, until "config.message_timeout" # (message.timeout.ms) is exceeded. Phrased differently, rdkafka-ruby's # WaitTimeoutError is just informative. # The raising can be avoided if max_wait_timeout below is greater than # config.message_timeout, but config is not available here (without # changing the interface). handle.wait(max_wait_timeout: 60, wait_timeout: 0.1) rescue Rdkafka::AbstractHandle::WaitTimeoutError => e partition = MessageDeliveryError.partition_from_delivery_handle(handle) # ideally we could use the logger passed to the Runner, but it is not # available here. The runner sets it for Rdkafka, though, so we can use # that instead. @config.logger.debug "Still trying to deliver message to (partition #{partition})... (will try up to Racecar.config.message_timeout)" retry rescue Rdkafka::RdkafkaError => e raise MessageDeliveryError.new(e, handle) end end end @delivery_handles.clear end |
#teardown ⇒ Object
52 |
# File 'lib/racecar/consumer.rb', line 52 def teardown; end |