Class: Fluent::Plugin::DirectoryInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_directory.rb

Instance Method Summary collapse

Instance Method Details

#startObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/in_directory.rb', line 43

def start
  super

  # See: https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-timer
  timer_execute(:directory_timer, @run_interval) do
    begin
      # Use a stream to submit multiple events at the same time
      multiEventStream = MultiEventStream.new

      # Use the current time as the event time
      time = Fluent::Engine.now

      # Read filenames in the directory
      Dir
        .glob(@path + '/*')
        .take(@batch_size)
        .each do |filename|
          # Add the record to the stream
          multiEventStream.add(
            time,
            {
              @content_key => File.read(filename),
              @filename_key => filename
            }
          )

          # Remove the file
          File.delete(filename)
        end

      # Send the events
      router.emit_stream(tag, multiEventStream)
    rescue StandardError
      yield(nil, nil)
    end
  end
end