Class: LogStash::Inputs::File

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/file.rb

Defined Under Namespace

Classes: FlushableListener, ListenerTail

Instance Method Summary collapse

Instance Method Details

#begin_tailingObject



281
282
283
284
285
286
287
288
289
# File 'lib/logstash/inputs/file.rb', line 281

def begin_tailing
  # if the pipeline restarts this input,
  # make sure previous files are closed
  stop
  # use observer listener api
  @tail = FileWatch::Tail.new_observing(@tail_config)
  @tail.logger = @logger
  @path.each { |path| @tail.tail(path) }
end

#listener_for(path) ⇒ Object



276
277
278
279
# File 'lib/logstash/inputs/file.rb', line 276

def listener_for(path)
  # path is the identity
  ListenerTail.new(path, self)
end

#log_line_received(path, line) ⇒ Object



304
305
306
307
# File 'lib/logstash/inputs/file.rb', line 304

def log_line_received(path, line)
  return if !@logger.debug?
  @logger.debug("Received line", :path => path, :text => line)
end

#post_process_this(event) ⇒ Object

def run



298
299
300
301
302
# File 'lib/logstash/inputs/file.rb', line 298

def post_process_this(event)
  event.set("host", @host) if !event.include?("host")
  decorate(event)
  @queue << event
end

#registerObject



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/logstash/inputs/file.rb', line 167

def register
  require "addressable/uri"
  require "filewatch/tail"
  require "digest/md5"
  @logger.info("Registering file input", :path => @path)
  @host = Socket.gethostname.force_encoding(Encoding::UTF_8)

  @tail_config = {
    :exclude => @exclude,
    :stat_interval => @stat_interval,
    :discover_interval => @discover_interval,
    :sincedb_write_interval => @sincedb_write_interval,
    :delimiter => @delimiter,
    :ignore_older => @ignore_older,
    :close_older => @close_older,
    :max_open_files => @max_open_files
  }

  @path.each do |path|
    if Pathname.new(path).relative?
      raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
    end
  end

  if @sincedb_path.nil?
    if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
      @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
                    "to keep track of the files I'm watching. Either set " \
                    "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
                    "in your Logstash config for the file input with " \
                    "path '#{@path.inspect}'")
      raise # TODO(sissel): HOW DO I FAIL PROPERLY YO
    end

    #pick SINCEDB_DIR if available, otherwise use HOME
    sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"]

    # Join by ',' to make it easy for folks to know their own sincedb
    # generated path (vs, say, inspecting the @path array)
    @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@path.join(",")))

    # Migrate any old .sincedb to the new file (this is for version <=1.1.1 compatibility)
    old_sincedb = File.join(sincedb_dir, ".sincedb")
    if File.exists?(old_sincedb)
      @logger.info("Renaming old ~/.sincedb to new one", :old => old_sincedb,
                   :new => @sincedb_path)
      File.rename(old_sincedb, @sincedb_path)
    end

    @logger.info("No sincedb_path set, generating one based on the file path",
                 :sincedb_path => @sincedb_path, :path => @path)
  end

  if File.directory?(@sincedb_path)
    raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
  end

  @tail_config[:sincedb_path] = @sincedb_path

  if @start_position == "beginning"
    @tail_config[:start_new_files_at] = :beginning
  end

  @codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
end

#run(queue) ⇒ Object



291
292
293
294
295
296
# File 'lib/logstash/inputs/file.rb', line 291

def run(queue)
  begin_tailing
  @queue = queue
  @tail.subscribe(self)
  exit_flush
end

#stopObject



309
310
311
312
313
314
315
316
317
# File 'lib/logstash/inputs/file.rb', line 309

def stop
  # in filewatch >= 0.6.7, quit will closes and forget all files
  # but it will write their last read positions to since_db
  # beforehand
  if @tail
    @codec.close
    @tail.quit
  end
end