Class: Racecar::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/consumer.rb

Defined Under Namespace

Classes: Subscription

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.consumerObject

Returns the value of attribute consumer.



12
13
14
# File 'lib/racecar/consumer.rb', line 12

def consumer
  @consumer
end

.fetch_messagesObject

Returns the value of attribute fetch_messages.



12
13
14
# File 'lib/racecar/consumer.rb', line 12

def fetch_messages
  @fetch_messages
end

.group_idObject

Returns the value of attribute group_id.



11
12
13
# File 'lib/racecar/consumer.rb', line 11

def group_id
  @group_id
end

.max_wait_timeObject

Returns the value of attribute max_wait_time.



10
11
12
# File 'lib/racecar/consumer.rb', line 10

def max_wait_time
  @max_wait_time
end

.parallel_workersObject

Returns the value of attribute parallel_workers.



12
13
14
# File 'lib/racecar/consumer.rb', line 12

def parallel_workers
  @parallel_workers
end

.producerObject

Returns the value of attribute producer.



12
13
14
# File 'lib/racecar/consumer.rb', line 12

def producer
  @producer
end

Class Method Details

.on_partitions_assigned(rebalance_event) ⇒ Object

Rebalance hooks for subclasses to override



42
# File 'lib/racecar/consumer.rb', line 42

def on_partitions_assigned(rebalance_event); end

.on_partitions_revoked(rebalance_event) ⇒ Object



43
# File 'lib/racecar/consumer.rb', line 43

def on_partitions_revoked(rebalance_event); end

.subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {}) ⇒ nil

Adds one or more topic subscriptions.

Can be called multiple times in order to subscribe to more topics.

Parameters:

  • topics (String)

    one or more topics to subscribe to.

  • start_from_beginning (Boolean) (defaults to: true)

    whether to start from the beginning or the end of each partition.

  • max_bytes_per_partition (Integer) (defaults to: 1048576)

    the maximum number of bytes to fetch from each partition at a time.

  • additional_config (Hash) (defaults to: {})

    Configuration properties for consumer. See github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Returns:

  • (nil)


30
31
32
33
34
35
36
37
38
39
# File 'lib/racecar/consumer.rb', line 30

def subscribes_to(
  *topics,
  start_from_beginning: true,
  max_bytes_per_partition: 1048576,
  additional_config: {}
)
  topics.each do |topic|
    subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config)
  end
end

.subscriptionsObject



14
15
16
# File 'lib/racecar/consumer.rb', line 14

def subscriptions
  @subscriptions ||= []
end

Instance Method Details

#configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/racecar/consumer.rb', line 46

def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config)
  @producer = producer
  @delivery_handles = []

  @consumer = consumer

  @instrumenter = instrumenter
  @config = config
end

#deliver!Object

Blocks until all messages produced so far have been successfully published. If message delivery finally fails, a Racecar::MessageDeliveryError is raised. The delivery failed for the reason in the exception. The error can be broker side (e.g. downtime, configuration issue) or specific to the message being sent. The caller must handle the latter cases or run into head of line blocking.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/racecar/consumer.rb', line 63

def deliver!
  @delivery_handles ||= []
  if @delivery_handles.any?
    instrumentation_payload = { delivered_message_count: @delivery_handles.size }

    @instrumenter.instrument('deliver_messages', instrumentation_payload) do
      @delivery_handles.each do |handle|
        begin
          # rdkafka-ruby checks every wait_timeout seconds if the message was
          # successfully delivered, up to max_wait_timeout seconds before raising
          # Rdkafka::AbstractHandle::WaitTimeoutError. librdkafka will (re)try to
          # deliver all messages in the background, until "config.message_timeout"
          # (message.timeout.ms) is exceeded. Phrased differently, rdkafka-ruby's
          # WaitTimeoutError is just informative.
          # The raising can be avoided if max_wait_timeout below is greater than
          # config.message_timeout, but config is not available here (without
          # changing the interface).
          handle.wait(max_wait_timeout: 60, wait_timeout: 0.1)
        rescue Rdkafka::AbstractHandle::WaitTimeoutError => e
          partition = MessageDeliveryError.partition_from_delivery_handle(handle)
          # ideally we could use the logger passed to the Runner, but it is not
          # available here. The runner sets it for Rdkafka, though, so we can use
          # that instead.
          @config.logger.debug "Still trying to deliver message to (partition #{partition})... (will try up to Racecar.config.message_timeout)"
          retry
        rescue Rdkafka::RdkafkaError => e
          raise MessageDeliveryError.new(e, handle)
        end
      end
    end
  end
  @delivery_handles.clear
end

#teardownObject



56
# File 'lib/racecar/consumer.rb', line 56

def teardown; end