Class: LogStash::Inputs::DeadLetterQueue

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/dead_letter_queue.rb

Overview

Logstash input to read events from Logstash’s dead letter queue

source, sh

input {

dead_letter_queue {
  path => "/var/logstash/data/dead_letter_queue"
  timestamp => "2017-04-04T23:40:37"
}

}


Instance Method Summary collapse

Instance Method Details

#registerObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 43

def register
  if @sincedb_path.nil?
    datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "dead_letter_queue", @pipeline_id)
    # Ensure that the filepath exists before writing, since it's deeply nested.
    FileUtils::mkdir_p datapath
    @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest(@path))
  elsif File.directory?(@sincedb_path)
    raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
  end

  dlq_path = java.nio.file.Paths.get(File.join(@path, @pipeline_id))
  sincedb_path = @sincedb_path ? java.nio.file.Paths.get(@sincedb_path) : nil
  start_timestamp = @start_timestamp ? org.logstash.Timestamp.new(@start_timestamp) : nil
  @inner_plugin = org.logstash.input.DeadLetterQueueInputPlugin.new(dlq_path, @commit_offsets, sincedb_path, start_timestamp)
  @inner_plugin.register

  if Gem::Requirement.new('< 7.0').satisfied_by?(Gem::Version.new(LOGSTASH_CORE_VERSION))
    @event_creator = Proc.new do |entry|
      clone = entry.event.clone
      # LS 6 LogStash::Event.new accept Map not Event
      event = LogStash::Event.new(clone.getData())
      event.set("[@metadata]", clone.())
      event
    end
  else
    @event_creator = -> (entry) { LogStash::Event.new(entry.event.clone) }
  end
end

#run(logstash_queue) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 73

def run(logstash_queue)
  @inner_plugin.run(lambda do |entry|
    event = @event_creator.(entry)
    event.set("[@metadata][dead_letter_queue][plugin_type]", entry.plugin_type)
    event.set("[@metadata][dead_letter_queue][plugin_id]", entry.plugin_id)
    event.set("[@metadata][dead_letter_queue][reason]", entry.reason)
    event.set("[@metadata][dead_letter_queue][entry_time]", entry.entry_time)
    decorate(event)
    logstash_queue << event
  end)
end

#stopObject



86
87
88
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 86

def stop
  @inner_plugin.close
end