Class: LogStash::Codecs::IdentityMapCodec::PeriodicRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/codecs/identity_map_codec.rb

Instance Method Summary collapse

Constructor Details

#initialize(listener, interval, method_symbol) ⇒ PeriodicRunner

Returns a new instance of PeriodicRunner.



42
43
44
45
46
# File 'lib/logstash/codecs/identity_map_codec.rb', line 42

def initialize(listener, interval, method_symbol)
  @listener, @interval = listener, interval
  @method_symbol = method_symbol
  @running = Concurrent::AtomicBoolean.new(true)
end

Instance Method Details

#running?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/logstash/codecs/identity_map_codec.rb', line 70

def running?
  @running.true?
end

#startObject

The goal of periodic runner is to have a single thread to do clean up / auto flush This method is expected to access by a single thread, otherwise, multiple Thread.start could create more than one periodic runner



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/logstash/codecs/identity_map_codec.rb', line 51

def start
  return self unless running?
  return self if running? && !@thread.nil?

  @thread = Thread.start do
    class_name = @listener.class.name.split('::').last # IdentityMapCodec
    LogStash::Util.set_thread_name("#{class_name}##{@method_symbol}")
    @listener.logger.debug("Start periodic runner")

    while running? do
      sleep @interval
      break if !running?
      break if (listener = @listener).nil?
      listener.send(@method_symbol)
    end
  end
  self
end

#stopObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/logstash/codecs/identity_map_codec.rb', line 74

def stop
  return unless running?

  @listener.logger.debug("Stop periodic runner")
  @running.make_false
  while @thread&.alive?
    begin
      @thread.wakeup
    rescue ThreadError
      # thread might drop dead since the alive? check
    else
      @thread.join(0.1) # raises $! if there was any
    end
  end
  @listener = nil
end