Class: Fairy::PSegSplit

Inherits:
PIOFilter show all
Defined in:
lib/fairy/node/p-seg-split.rb

Constant Summary

Constants inherited from PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary collapse

Attributes inherited from PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods inherited from PIOFilter

#input=

Methods inherited from PFilter

#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opts, n) ⇒ PSegSplit

DeepConnect.def_single_method_spec(self, “REF new(REF, VAL, VAL)”)



14
15
16
17
18
19
20
# File 'lib/fairy/node/p-seg-split.rb', line 14

def initialize(id, ntask, bjob, opts, n)
  super
  @no_split = n

  policy = @opts[:postqueuing_policy]
  @exports = @no_split.times.collect{Export.new(policy)}
end

Instance Attribute Details

#exportsObject (readonly)

Returns the value of attribute exports.



22
23
24
# File 'lib/fairy/node/p-seg-split.rb', line 22

def exports
  @exports
end

Instance Method Details

#start_exportObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fairy/node/p-seg-split.rb', line 25

def start_export
  Log::debug(self, "START_EXPORT")
#      return unless @status == ST_WAIT_IMPORT
  start do
	begin
	  @input.each_slice(@no_split) do |ll|
 if ll.size < @no_split
   ll.fill(0, @no_split){|idx| ll[idx] ||= END_OF_STREAM}
 end
 @exports.zip(ll) do |exp, l|
   exp.push l
 end
	  end
	ensure
	  @exports.each{|exp| exp.push END_OF_STREAM}
	end
  end
end