Class: Karafka::Pro::Processing::Filters::Delayer

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/filters/delayer.rb

Overview

A filter that allows us to delay processing by pausing until time is right.

Instance Attribute Summary

Attributes inherited from Base

#cursor

Instance Method Summary collapse

Methods inherited from Base

#applied?

Constructor Details

#initialize(delay) ⇒ Delayer

Returns a new instance of Delayer.

Parameters:

  • delay (Integer)

    ms delay / minimum age of each message we want to process



21
22
23
24
25
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 21

def initialize(delay)
  super()

  @delay = delay
end

Instance Method Details

#actionSymbol

Returns action to take on post-filtering.

Returns:

  • (Symbol)

    action to take on post-filtering



61
62
63
64
65
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 61

def action
  return :skip unless applied?

  timeout <= 0 ? :seek : :pause
end

#apply!(messages) ⇒ Object

Removes too young messages

Parameters:



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 30

def apply!(messages)
  @applied = false
  @cursor = nil

  # Time on message is in seconds with ms precision, so we need to convert the ttl that
  # is in ms to this format
  border = ::Time.now.utc - @delay / 1_000.to_f

  messages.delete_if do |message|
    too_young = message.timestamp > border

    if too_young
      @applied = true

      @cursor ||= message
    end

    @applied
  end
end

#timeoutInteger

Returns timeout delay in ms.

Returns:

  • (Integer)

    timeout delay in ms



52
53
54
55
56
57
58
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 52

def timeout
  return 0 unless @cursor

  timeout = (@delay / 1_000.to_f) - (::Time.now.utc - @cursor.timestamp)

  timeout <= 0 ? 0 : timeout * 1_000
end