Class: LogStash::Inputs::DeadLetterQueue
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::DeadLetterQueue
- Defined in:
- lib/logstash/inputs/dead_letter_queue.rb
Overview
Logstash input to read events from Logstash’s dead letter queue
- source, sh
input {
dead_letter_queue { path => "/var/logstash/data/dead_letter_queue" => "2017-04-04T23:40:37" }}
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 43 def register if @sincedb_path.nil? datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "dead_letter_queue", @pipeline_id) # Ensure that the filepath exists before writing, since it's deeply nested. FileUtils::mkdir_p datapath @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest(@path)) elsif File.directory?(@sincedb_path) raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") end dlq_path = java.nio.file.Paths.get(File.join(@path, @pipeline_id)) sincedb_path = @sincedb_path ? java.nio.file.Paths.get(@sincedb_path) : nil = @start_timestamp ? org.logstash.Timestamp.new(@start_timestamp) : nil @inner_plugin = org.logstash.input.DeadLetterQueueInputPlugin.new(dlq_path, @commit_offsets, sincedb_path, ) @inner_plugin.register if Gem::Requirement.new('< 7.0').satisfied_by?(Gem::Version.new(LOGSTASH_CORE_VERSION)) @event_creator = Proc.new do |entry| clone = entry.event.clone # LS 6 LogStash::Event.new accept Map not Event event = LogStash::Event.new(clone.getData()) event.set("[@metadata]", clone.getMetadata()) event end else @event_creator = -> (entry) { LogStash::Event.new(entry.event.clone) } end end |
#run(logstash_queue) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 73 def run(logstash_queue) @inner_plugin.run(lambda do |entry| event = @event_creator.(entry) event.set("[@metadata][dead_letter_queue][plugin_type]", entry.plugin_type) event.set("[@metadata][dead_letter_queue][plugin_id]", entry.plugin_id) event.set("[@metadata][dead_letter_queue][reason]", entry.reason) event.set("[@metadata][dead_letter_queue][entry_time]", entry.entry_time) decorate(event) logstash_queue << event end) end |
#stop ⇒ Object
86 87 88 |
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 86 def stop @inner_plugin.close end |