Class: RFlow::Components::File::DirectoryWatcher

Inherits:
RFlow::Component
  • Object
show all
Defined in:
lib/rflow/components/file/directory_watcher.rb

Constant Summary collapse

DEFAULT_CONFIG =
{
  'directory_path'  => '/tmp/import',
  'file_name_glob'  => '*',
  'poll_interval'   => 1,
  'files_per_poll'  => 1,
  'remove_files'    => true,
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#configObject

Returns the value of attribute config.



19
20
21
# File 'lib/rflow/components/file/directory_watcher.rb', line 19

def config
  @config
end

#directory_pathObject

Returns the value of attribute directory_path.



19
20
21
# File 'lib/rflow/components/file/directory_watcher.rb', line 19

def directory_path
  @directory_path
end

#file_name_globObject

Returns the value of attribute file_name_glob.



19
20
21
# File 'lib/rflow/components/file/directory_watcher.rb', line 19

def file_name_glob
  @file_name_glob
end

#poll_intervalObject

Returns the value of attribute poll_interval.



19
20
21
# File 'lib/rflow/components/file/directory_watcher.rb', line 19

def poll_interval
  @poll_interval
end

#remove_filesObject

Returns the value of attribute remove_files.



19
20
21
# File 'lib/rflow/components/file/directory_watcher.rb', line 19

def remove_files
  @remove_files
end

Instance Method Details

#configure!(config) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/rflow/components/file/directory_watcher.rb', line 21

def configure!(config)
  @config = DEFAULT_CONFIG.merge config
  @directory_path  = ::File.expand_path(@config['directory_path'])
  @file_name_glob  = @config['file_name_glob']
  @poll_interval   = @config['poll_interval'].to_i
  @files_per_poll  = @config['files_per_poll'].to_i
  @remove_files    = to_boolean(@config['remove_files'])

  unless ::File.directory?(@directory_path)
    raise ArgumentError, "Invalid directory '#{@directory_path}'"
  end

  unless ::File.readable?(@directory_path)
    raise ArgumentError, "Unable to read from directory '#{@directory_path}'"
  end

  # TODO: more error checking of input config
end

#run!Object

TODO: optimize sending of messages based on what is connected



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rflow/components/file/directory_watcher.rb', line 41

def run!
  timer = EventMachine::PeriodicTimer.new(poll_interval) do
    RFlow.logger.debug { "#{name}: Polling for files in #{::File.join(@directory_path, @file_name_glob)}" }
    file_paths = Dir.glob(::File.join(@directory_path, @file_name_glob)).
      sort_by {|f| test(?M, f)}. # sort by last modified to process the earliest modified file first
      select {|f| shard.count == 1 || ((f.sum % shard.count) + 1 == worker.index) } # for multiple copies, share the load equally

    file_paths.first(@files_per_poll).each do |path|
      RFlow.logger.debug { "#{name}: Importing #{path}" }
      unless ::File.readable?(path)
        RFlow.logger.warn "#{name}: Unable to read file #{path}, skipping it"
        next
      end
      if @remove_files && !::File.writable?(path)
        RFlow.logger.warn "#{name}: Unable to remove file #{path}, skipping it"
        next
      end

      ::File.open(path, 'r:BINARY') do |file|
        content = file.read

        RFlow.logger.debug { "#{name}: Read #{content.bytesize} bytes of #{file.size} in #{file.path}, md5 #{Digest::MD5.hexdigest(content)}" }

        file_port.send_message(RFlow::Message.new('RFlow::Message::Data::File').tap do |m|
          m.data.path = ::File.expand_path(file.path)
          m.data.size = file.size
          m.data.content = content
          m.data.creation_timestamp = file.ctime
          m.data.modification_timestamp = file.mtime
          m.data.access_timestamp = file.atime
        end)

        raw_port.send_message(RFlow::Message.new('RFlow::Message::Data::Raw').tap do |m|
          m.data.raw = content
        end)
      end

      if @remove_files
        RFlow.logger.debug { "#{name}: Removing #{path}" }
        ::File.delete path
      end
    end
  end
end

#to_boolean(string) ⇒ Object



86
87
88
89
90
91
92
# File 'lib/rflow/components/file/directory_watcher.rb', line 86

def to_boolean(string)
  case string
  when /^true$/i, '1', true; true
  when /^false/i, '0', false; false
  else raise ArgumentError, "'#{string}' cannot be coerced to a boolean value"
  end
end