Class: FileWatch::ReadMode::Processor

Inherits:
Object
  • Object
show all
Includes:
LogStash::Util::Loggable
Defined in:
lib/filewatch/read_mode/processor.rb

Overview

Must handle

:read_file
:read_zip_file

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(settings) ⇒ Processor

Returns a new instance of Processor.



17
18
19
20
# File 'lib/filewatch/read_mode/processor.rb', line 17

def initialize(settings)
  @settings = settings
  @deletable_filepaths = []
end

Instance Attribute Details

#deletable_filepathsObject (readonly)

Returns the value of attribute deletable_filepaths.



15
16
17
# File 'lib/filewatch/read_mode/processor.rb', line 15

def deletable_filepaths
  @deletable_filepaths
end

#watchObject (readonly)

Returns the value of attribute watch.



15
16
17
# File 'lib/filewatch/read_mode/processor.rb', line 15

def watch
  @watch
end

Instance Method Details

#add_watch(watch) ⇒ Object



22
23
24
25
# File 'lib/filewatch/read_mode/processor.rb', line 22

def add_watch(watch)
  @watch = watch
  self
end

#common_deleted_reaction(watched_file, action) ⇒ Object



106
107
108
109
110
111
# File 'lib/filewatch/read_mode/processor.rb', line 106

def common_deleted_reaction(watched_file, action)
  # file has gone away or we can't read it anymore.
  watched_file.unwatch
  deletable_filepaths << watched_file.path
  logger.debug("#{action} - stat failed: #{watched_file.path}, removing from collection")
end

#common_error_reaction(path, error, action) ⇒ Object



113
114
115
# File 'lib/filewatch/read_mode/processor.rb', line 113

def common_error_reaction(path, error, action)
  logger.error("#{action} - other error #{path}: (#{error.message}, #{error.backtrace.take(8).inspect})")
end

#initialize_handlers(sincedb_collection, observer) ⇒ Object



27
28
29
30
# File 'lib/filewatch/read_mode/processor.rb', line 27

def initialize_handlers(sincedb_collection, observer)
  @read_file = Handlers::ReadFile.new(sincedb_collection, observer, @settings)
  @read_zip_file = Handlers::ReadZipFile.new(sincedb_collection, observer, @settings)
end

#process_active(watched_files) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/filewatch/read_mode/processor.rb', line 81

def process_active(watched_files)
  logger.debug("Active processing")
  # Handles watched_files in the active state.
  watched_files.select {|wf| wf.active? }.each do |watched_file|
    path = watched_file.path
    begin
      watched_file.restat
    rescue Errno::ENOENT
      common_deleted_reaction(watched_file, "Active")
      next
    rescue => e
      common_error_reaction(path, e, "Active")
      next
    end
    break if watch.quit?

    if watched_file.compressed?
      read_zip_file(watched_file)
    else
      read_file(watched_file)
    end
    # handlers take care of closing and unwatching
  end
end

#process_closed(watched_files) ⇒ Object



40
41
42
# File 'lib/filewatch/read_mode/processor.rb', line 40

def process_closed(watched_files)
  # do not process watched_files in the closed state.
end

#process_ignored(watched_files) ⇒ Object



44
45
46
# File 'lib/filewatch/read_mode/processor.rb', line 44

def process_ignored(watched_files)
  # do not process watched_files in the ignored state.
end

#process_watched(watched_files) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/filewatch/read_mode/processor.rb', line 48

def process_watched(watched_files)
  logger.debug("Watched processing")
  # Handles watched_files in the watched state.
  # for a slice of them:
  #   move to the active state
  #   should never have been active before
  # how much of the max active window is available
  to_take = @settings.max_active - watched_files.count{|wf| wf.active?}
  if to_take > 0
    watched_files.select {|wf| wf.watched?}.take(to_take).each do |watched_file|
      path = watched_file.path
      begin
        watched_file.restat
        watched_file.activate
      rescue Errno::ENOENT
        common_deleted_reaction(watched_file, "Watched")
        next
      rescue => e
        common_error_reaction(path, e, "Watched")
        next
      end
      break if watch.quit?
    end
  else
    now = Time.now.to_i
    if (now - watch.lastwarn_max_files) > MAX_FILES_WARN_INTERVAL
      waiting = watched_files.size - @settings.max_active
      logger.warn(@settings.max_warn_msg + ", files yet to open: #{waiting}")
      watch.lastwarn_max_files = now
    end
  end
end

#read_file(watched_file) ⇒ Object



32
33
34
# File 'lib/filewatch/read_mode/processor.rb', line 32

def read_file(watched_file)
  @read_file.handle(watched_file)
end

#read_zip_file(watched_file) ⇒ Object



36
37
38
# File 'lib/filewatch/read_mode/processor.rb', line 36

def read_zip_file(watched_file)
  @read_zip_file.handle(watched_file)
end