Class: Fairy::POutputFile

Inherits:
PIOFilter show all
Defined in:
lib/fairy/node/p-output-file.rb

Constant Summary collapse

ST_OUTPUT_FINISH =
:ST_OUTPUT_FINISH
IPADDR_REGEXP =
/::ffff:([0-9]+\.){3}[0-9]+|[0-9a-f]+:([0-9a-f]*:)[0-9a-f]*/

Constants inherited from PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes inherited from PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods inherited from PFilter

#abort_running, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opt, vf) ⇒ POutputFile

Returns a new instance of POutputFile.



16
17
18
19
20
21
# File 'lib/fairy/node/p-output-file.rb', line 16

def initialize(id, ntask, bjob, opt, vf)
  super
  @vfile = vf

#      @imports = XThread::Queue.new
end

Instance Method Details

#basic_start(&block) ⇒ Object

def add_input(input)

unless input

@imports.push nil return self

end
policy = @opts[:prequeuing_policy]
import = Import.new(policy)
import.add_key(input.key)
input.output = import
import.set_log_callback do |n|

Log::verbose(self, “IMPORT POP: #n”)

  end
  @imports.push import
  self
end


44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fairy/node/p-output-file.rb', line 44

def basic_start(&block)
  Log::debug(self, "START")
  output_uri = gen_real_file_name
  @vfile.set_real_file(no, output_uri)

  Log::debug(self, "write real file: #{output_uri}")
  begin
	output_file = URI.parse(output_uri).path
  rescue
	Log::debug_exception(self)
	raise
  end

  unless File.exist?(File.dirname(output_file))
	create_dir(File.dirname(output_file))
  end

  File.open(output_file, "w") do |io|
	Log::debug(self, "start write real file: #{output_uri}")
	@input.each do |l|
	  io.puts l
	end
	Log::debug(self, "finish write real file: #{output_uri}")
  end
  self.status = ST_OUTPUT_FINISH
end

#create_dir(path) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/fairy/node/p-output-file.rb', line 71

def create_dir(path)
  unless File.exist?(File.dirname(path))
	create_dir(File.dirname(path))
  end
  begin
	Dir.mkdir(path)
  rescue Errno::EEXIST
	# 無視
  end
end

#gen_real_file_nameObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fairy/node/p-output-file.rb', line 84

def gen_real_file_name
  host= processor.addr
  root = CONF.VF_ROOT
  prefix = CONF.VF_PREFIX
  base_name = @vfile.base_name
  no = @input.no
  

  if IPADDR_REGEXP =~ host
	begin
	  host = Resolv.getname(host)
	rescue
	  # ホスト名が分からない場合 は そのまま ipv6 アドレスにする
	  host = "[#{host}]"
	end
  end
  
  format("file://#{host}#{root}/#{prefix}/#{base_name}-%03d", no)
end

#input=(input) ⇒ Object



23
24
25
26
# File 'lib/fairy/node/p-output-file.rb', line 23

def input=(input)
  super
  start
end