Class: Karafka::Pro::Processing::Filters::Delayer

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/filters/delayer.rb

Overview

A filter that allows us to delay processing by pausing until time is right.

Instance Attribute Summary

Attributes inherited from Base

#cursor

Instance Method Summary collapse

Methods inherited from Base

#applied?, #mark_as_consumed?, #marking_cursor, #marking_method

Constructor Details

#initialize(delay) ⇒ Delayer

Returns a new instance of Delayer.

Parameters:

  • delay (Integer)

    ms delay / minimum age of each message we want to process



30
31
32
33
34
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 30

def initialize(delay)
  super()

  @delay = delay
end

Instance Method Details

#actionSymbol

Returns action to take on post-filtering.

Returns:

  • (Symbol)

    action to take on post-filtering



70
71
72
73
74
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 70

def action
  return :skip unless applied?

  timeout <= 0 ? :seek : :pause
end

#apply!(messages) ⇒ Object

Removes too young messages

Parameters:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 39

def apply!(messages)
  @applied = false
  @cursor = nil

  # Time on message is in seconds with ms precision, so we need to convert the ttl that
  # is in ms to this format
  border = Time.now.utc - (@delay / 1_000.0)

  messages.delete_if do |message|
    too_young = message.timestamp > border

    if too_young
      @applied = true

      @cursor ||= message
    end

    @applied
  end
end

#timeoutInteger

Returns timeout delay in ms.

Returns:

  • (Integer)

    timeout delay in ms



61
62
63
64
65
66
67
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 61

def timeout
  return 0 unless @cursor

  timeout = (@delay / 1_000.0) - (::Time.now.utc - @cursor.timestamp)

  timeout <= 0 ? 0 : timeout * 1_000
end