Class: RFlow::Components::File::OutputRawToFiles

Inherits:
RFlow::Component
  • Object
show all
Defined in:
lib/rflow/components/file/output_raw_to_files.rb

Constant Summary collapse

DEFAULT_CONFIG =
{
  'directory_path'  => '/tmp',
  'file_name_prefix' => 'output.',
  'file_name_suffix' => '.out',
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#configObject

Returns the value of attribute config.



17
18
19
# File 'lib/rflow/components/file/output_raw_to_files.rb', line 17

def config
  @config
end

#directory_pathObject

Returns the value of attribute directory_path.



17
18
19
# File 'lib/rflow/components/file/output_raw_to_files.rb', line 17

def directory_path
  @directory_path
end

#file_name_prefixObject

Returns the value of attribute file_name_prefix.



17
18
19
# File 'lib/rflow/components/file/output_raw_to_files.rb', line 17

def file_name_prefix
  @file_name_prefix
end

#file_name_suffixObject

Returns the value of attribute file_name_suffix.



17
18
19
# File 'lib/rflow/components/file/output_raw_to_files.rb', line 17

def file_name_suffix
  @file_name_suffix
end

Instance Method Details

#configure!(config) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rflow/components/file/output_raw_to_files.rb', line 19

def configure!(config)
  @config = DEFAULT_CONFIG.merge config
  @directory_path  = ::File.expand_path(@config['directory_path'])
  @file_name_prefix = @config['file_name_prefix']
  @file_name_suffix = @config['file_name_suffix']

  unless ::File.directory?(@directory_path)
    raise ArgumentError, "Invalid directory '#{@directory_path}'"
  end

  unless ::File.writable?(@directory_path)
    raise ArgumentError, "Unable to read from directory '#{@directory_path}'"
  end

  # TODO: more error checking of input config
end

#process_message(input_port, input_port_key, connection, message) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rflow/components/file/output_raw_to_files.rb', line 36

def process_message(input_port, input_port_key, connection, message)
  return unless message.data_type_name == 'RFlow::Message::Data::Raw'

  @output_file_entropy = 1
  begin
    final_output_file_name = output_file_name

    temp_output_file_path = ::File.join(directory_path, ".#{final_output_file_name}")
    final_output_file_path = ::File.join(directory_path, "#{final_output_file_name}")

    RFlow.logger.debug { "#{self.class.name}##{__method__}: Outputting raw message to #{final_output_file_path} (via #{temp_output_file_path}) with #{message.data.raw.bytesize} bytes and md5 #{Digest::MD5.hexdigest message.data.raw}" }

    ::File.open(temp_output_file_path, ::File::CREAT|::File::EXCL|::File::RDWR, 0644, :external_encoding => 'BINARY') do |file|
      file.flock(::File::LOCK_EX)
      file.write(message.data.raw)
    end
    ::File.rename(temp_output_file_path, final_output_file_path)
    RFlow.logger.debug { "#{self.class.name}##{__method__}: Successfully output raw message to #{final_output_file_path}" }
  rescue Errno::EEXIST => e
    RFlow.logger.debug { "#{self.class.name}##{__method__}: File #{temp_output_file_path} exists, increasing entropy" }
    @output_file_entropy += 1
    retry
  end

  final_output_file_path
end