Class: LogStash::Inputs::File

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/file.rb

Defined Under Namespace

Classes: ListenerTail

Instance Method Summary collapse

Instance Method Details

#begin_tailingObject



259
260
261
262
263
264
265
266
267
# File 'lib/logstash/inputs/file.rb', line 259

def begin_tailing
  # if the pipeline restarts this input,
  # make sure previous files are closed
  stop
  # use observer listener api
  @tail = FileWatch::Tail.new_observing(@tail_config)
  @tail.logger = @logger
  @path.each { |path| @tail.tail(path) }
end

#listener_for(path) ⇒ Object



254
255
256
257
# File 'lib/logstash/inputs/file.rb', line 254

def listener_for(path)
  # path is the identity
  ListenerTail.new(path, self)
end

#log_line_received(path, line) ⇒ Object



281
282
283
284
# File 'lib/logstash/inputs/file.rb', line 281

def log_line_received(path, line)
  return if !@logger.debug?
  @logger.debug("Received line", :path => path, :text => line)
end

#post_process_this(event) ⇒ Object

def run



275
276
277
278
279
# File 'lib/logstash/inputs/file.rb', line 275

def post_process_this(event)
  event["host"] = @host if !event.include?("host")
  decorate(event)
  @queue << event
end

#registerObject



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/logstash/inputs/file.rb', line 150

def register
  require "addressable/uri"
  require "filewatch/tail"
  require "digest/md5"
  @logger.info("Registering file input", :path => @path)
  @host = Socket.gethostname.force_encoding(Encoding::UTF_8)

  @tail_config = {
    :exclude => @exclude,
    :stat_interval => @stat_interval,
    :discover_interval => @discover_interval,
    :sincedb_write_interval => @sincedb_write_interval,
    :delimiter => @delimiter,
    :ignore_older => @ignore_older,
    :close_older => @close_older
  }

  @path.each do |path|
    if Pathname.new(path).relative?
      raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
    end
  end

  if @sincedb_path.nil?
    if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
      @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
                    "to keep track of the files I'm watching. Either set " \
                    "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
                    "in your Logstash config for the file input with " \
                    "path '#{@path.inspect}'")
      raise # TODO(sissel): HOW DO I FAIL PROPERLY YO
    end

    #pick SINCEDB_DIR if available, otherwise use HOME
    sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"]

    # Join by ',' to make it easy for folks to know their own sincedb
    # generated path (vs, say, inspecting the @path array)
    @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@path.join(",")))

    # Migrate any old .sincedb to the new file (this is for version <=1.1.1 compatibility)
    old_sincedb = File.join(sincedb_dir, ".sincedb")
    if File.exists?(old_sincedb)
      @logger.info("Renaming old ~/.sincedb to new one", :old => old_sincedb,
                   :new => @sincedb_path)
      File.rename(old_sincedb, @sincedb_path)
    end

    @logger.info("No sincedb_path set, generating one based on the file path",
                 :sincedb_path => @sincedb_path, :path => @path)
  end

  if File.directory?(@sincedb_path)
    raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
  end

  @tail_config[:sincedb_path] = @sincedb_path

  if @start_position == "beginning"
    @tail_config[:start_new_files_at] = :beginning
  end

  @codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
end

#run(queue) ⇒ Object



269
270
271
272
273
# File 'lib/logstash/inputs/file.rb', line 269

def run(queue)
  begin_tailing
  @queue = queue
  @tail.subscribe(self)
end

#stopObject



286
287
288
289
290
291
292
293
294
# File 'lib/logstash/inputs/file.rb', line 286

def stop
  # in filewatch >= 0.6.7, quit will closes and forget all files
  # but it will write their last read positions to since_db
  # beforehand
  if @tail
    @codec.close
    @tail.quit
  end
end