Module: Deimos::ActiveRecordConsume::BatchConsumption

Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/batch_consumption.rb

Overview

Methods for consuming batches of messages and saving them to the database in bulk ActiveRecord operations.

Instance Method Summary collapse

Instance Method Details

#consume_batch(payloads, metadata) ⇒ Object

Handle a batch of Kafka messages. Batches are split into “slices”, which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split

Parameters:



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 20

def consume_batch(payloads, )
  messages = payloads.
    zip([:keys]).
    map { |p, k| Deimos::Message.new(p, nil, key: k) }

  tags = %W(topic:#{[:topic]})

  Deimos.instrument('ar_consumer.consume_batch', tags) do
    # The entire batch should be treated as one transaction so that if
    # any message fails, the whole thing is rolled back or retried
    # if there is deadlock
    Deimos::Utils::DeadlockRetry.wrap(tags) do
      if @compacted || self.class.config[:no_keys]
        update_database(compact_messages(messages))
      else
        uncompacted_update(messages)
      end
    end
  end
end

#record_key(key) ⇒ Hash

Get unique key for the ActiveRecord instance from the incoming key. Override this method (with super) to customize the set of attributes that uniquely identifies each record in the database.

Parameters:

  • key (String)

    The encoded key.

Returns:

  • (Hash)

    The key attributes.



46
47
48
49
50
51
52
53
54
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 46

def record_key(key)
  if key.nil?
    {}
  elsif key.is_a?(Hash)
    @key_converter.convert(key)
  else
    { @klass.primary_key => key }
  end
end