Method: Kafka::Consumer#pause
- Defined in:
- lib/kafka/consumer.rb
#pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ nil
Pause processing of a specific topic partition.
When a specific message causes the processor code to fail, it can be a good idea to simply pause the partition until the error can be resolved, allowing the rest of the partitions to continue being processed.
If the timeout argument is passed, the partition will automatically be
resumed when the timeout expires. If exponential_backoff is enabled, each
subsequent pause will cause the timeout to double until a message from the
partition has been successfully processed.
152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/kafka/consumer.rb', line 152 def pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false) if max_timeout && !exponential_backoff raise ArgumentError, "`max_timeout` only makes sense when `exponential_backoff` is enabled" end pause_for(topic, partition).pause!( timeout: timeout, max_timeout: max_timeout, exponential_backoff: exponential_backoff, ) end |