Module: WaterDrop::Producer::Transactions

Included in:
WaterDrop::Producer
Defined in:
lib/waterdrop/producer/transactions.rb

Overview

Transactions related producer functionalities

Instance Method Summary collapse

Instance Method Details

#transactionObject

Creates a transaction.

Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.

Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.

Please note, that if a producer is configured as transactional, it cannot produce messages outside of transactions, that is why by default all dispatches will be wrapped with a transaction. One transaction per single dispatch and for ‘produce_many` it will be a single transaction wrapping all messages dispatches (not one per message).

Examples:

Simple transaction

producer.transaction do
  producer.produce_async(topic: 'topic', payload: 'data')
end

Aborted transaction - messages producer won’t be visible by consumers

producer.transaction do
  producer.produce_sync(topic: 'topic', payload: 'data')
  throw(:abort)
end

Use block result last handler to wait on all messages ack

handler = producer.transaction do
            producer.produce_async(topic: 'topic', payload: 'data')
          end

handler.wait

Returns:

  • Block result



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/waterdrop/producer/transactions.rb', line 51

def transaction
  # This will safely allow us to support one operation transactions so a transactional
  # producer can work without the transactional block if needed
  return yield if @transaction_mutex.owned?

  @transaction_mutex.synchronize do
    transactional_instrument(:finished) do
      with_transactional_error_handling(:begin) do
        transactional_instrument(:started) { client.begin_transaction }
      end

      result = nil
      commit = false

      catch(:abort) do
        result = yield
        commit = true
      end

      commit || raise(WaterDrop::Errors::AbortTransaction)

      with_transactional_error_handling(:commit) do
        transactional_instrument(:committed) { client.commit_transaction }
      end

      result
    # We need to handle any interrupt including critical in order not to have the transaction
    # running. This will also handle things like `IRB::Abort`
    #
    # rubocop:disable Lint/RescueException
    rescue Exception => e
      # rubocop:enable Lint/RescueException
      with_transactional_error_handling(:abort) do
        transactional_instrument(:aborted) { client.abort_transaction }
      end

      raise unless e.is_a?(WaterDrop::Errors::AbortTransaction)
    end
  end
end

#transaction?Boolean

Returns true if we are in an active transaction.

Returns:

  • (Boolean)

    true if we are in an active transaction



93
94
95
# File 'lib/waterdrop/producer/transactions.rb', line 93

def transaction?
  @transaction_mutex.owned?
end

#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object

Marks given message as consumed inside of a transaction.

Parameters:

  • consumer (#consumer_group_metadata_pointer)

    any consumer from which we can obtain the librdkafka consumer group metadata pointer

  • message (Karafka::Messages::Message)

    karafka message

  • offset_metadata (String) (defaults to: nil)

    offset metadata or nil if none

Raises:



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/waterdrop/producer/transactions.rb', line 110

def transaction_mark_as_consumed(consumer, message,  = nil)
  raise Errors::TransactionRequiredError unless @transaction_mutex.owned?

  CONTRACT.validate!(
    {
      consumer: consumer,
      message: message,
      offset_metadata: 
    },
    Errors::TransactionalOffsetInvalidError
  )

  details = { message: message, offset_metadata:  }

  transactional_instrument(:marked_as_consumed, details) do
    tpl = Rdkafka::Consumer::TopicPartitionList.new
    partition = Rdkafka::Consumer::Partition.new(
      message.partition,
      # +1 because this is next offset from which we will start processing from
      message.offset + 1,
      0,
      
    )

    tpl.add_topic_and_partitions_with_offsets(message.topic, [partition])

    with_transactional_error_handling(:store_offset) do
      client.send_offsets_to_transaction(
        consumer,
        tpl,
        @config.max_wait_timeout
      )
    end
  end
end

#transactional?Boolean

Returns Is this producer a transactional one.

Returns:

  • (Boolean)

    Is this producer a transactional one



98
99
100
101
102
# File 'lib/waterdrop/producer/transactions.rb', line 98

def transactional?
  return @transactional if instance_variable_defined?(:'@transactional')

  @transactional = config.kafka.to_h.key?(:'transactional.id')
end