Module: Karafka::Pro::Routing::Features::Expiring::Topic

Defined in:
lib/karafka/pro/routing/features/expiring/topic.rb

Overview

Topic expiring API extensions

Instance Method Summary collapse

Instance Method Details

#expire_in(*args) ⇒ Object

Just an alias for nice API

Parameters:

  • args (Array)

    Anything ‘#expiring` accepts



39
40
41
# File 'lib/karafka/pro/routing/features/expiring/topic.rb', line 39

def expire_in(*args)
  expiring(*args)
end

#expiring(ttl = nil) ⇒ Object

Parameters:

  • ttl (Integer, nil) (defaults to: nil)

    maximum time in ms a message is considered alive



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/karafka/pro/routing/features/expiring/topic.rb', line 22

def expiring(ttl = nil)
  # Those settings are used for validation
  @expiring ||= begin
    config = Config.new(active: !ttl.nil?, ttl: ttl)

    if config.active?
      factory = ->(*) { Pro::Processing::Filters::Expirer.new(ttl) }
      filter(factory)
    end

    config
  end
end

#expiring?Boolean

Returns is a given job expiring.

Returns:

  • (Boolean)

    is a given job expiring



44
45
46
# File 'lib/karafka/pro/routing/features/expiring/topic.rb', line 44

def expiring?
  expiring.active?
end

#to_hHash

Returns topic with all its native configuration options plus expiring.

Returns:

  • (Hash)

    topic with all its native configuration options plus expiring



49
50
51
52
53
# File 'lib/karafka/pro/routing/features/expiring/topic.rb', line 49

def to_h
  super.merge(
    expiring: expiring.to_h
  ).freeze
end