Method: Kafka::Consumer#pause

Defined in:
lib/kafka/consumer.rb

#pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ nil

Pause processing of a specific topic partition.

When a specific message causes the processor code to fail, it can be a good idea to simply pause the partition until the error can be resolved, allowing the rest of the partitions to continue being processed.

If the timeout argument is passed, the partition will automatically be resumed when the timeout expires. If exponential_backoff is enabled, each subsequent pause will cause the timeout to double until a message from the partition has been successfully processed.

Parameters:

  • topic (String)
  • partition (Integer)
  • timeout (nil, Integer) (defaults to: nil)

    the number of seconds to pause the partition for, or nil if the partition should not be automatically resumed.

  • max_timeout (nil, Integer) (defaults to: nil)

    the maximum number of seconds to pause for, or nil if no maximum should be enforced.

  • exponential_backoff (Boolean) (defaults to: false)

    whether to enable exponential backoff.

Returns:

  • (nil)


152
153
154
155
156
157
158
159
160
161
162
# File 'lib/kafka/consumer.rb', line 152

def pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false)
  if max_timeout && !exponential_backoff
    raise ArgumentError, "`max_timeout` only makes sense when `exponential_backoff` is enabled"
  end

  pause_for(topic, partition).pause!(
    timeout: timeout,
    max_timeout: max_timeout,
    exponential_backoff: exponential_backoff,
  )
end