Class: Fairy::PSegZip

Inherits:
PSingleExportFilter show all
Defined in:
lib/fairy/node/p-seg-zip.rb

Constant Summary

Constants included from PSingleExportable

Fairy::PSingleExportable::END_OF_STREAM, Fairy::PSingleExportable::ST_EXPORT_FINISH, Fairy::PSingleExportable::ST_WAIT_EXPORT_FINISH

Constants inherited from PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes included from PSingleExportable

#export

Attributes inherited from PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods included from PSingleExportable

#start, #start_export, #terminate, #wait_export_finish

Methods inherited from PIOFilter

#input=

Methods inherited from PFilter

#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opts, block_source) ⇒ PSegZip

DeepConnect.def_single_method_spec(self, “REF new(REF, VAL, VAL, VAL)”)



15
16
17
18
19
20
21
22
23
24
# File 'lib/fairy/node/p-seg-zip.rb', line 15

def initialize(id, ntask, bjob, opts, block_source)
  super
  @block_source = block_source
#      @map_proc = eval("proc{#{@block_source}}", TOPLEVEL_BINDING)
#      @map_proc = @context.create_proc(@block_source)

  @zip_imports = nil
  @zip_imports_mutex = Mutex.new
  @zip_imports_cv = ConditionVariable.new
end

Instance Method Details

#basic_each(&block) ⇒ Object



58
59
60
61
62
63
64
65
66
67
# File 'lib/fairy/node/p-seg-zip.rb', line 58

def basic_each(&block)
  @map_proc = BBlock.new(@block_source, @context, self)
  @input.each do |e|
	zips = zip_imports.collect{|import| import.pop}
	if Import::CTLTOKEN_NULLVALUE === (v = @map_proc.yield(e, *zips))
	  next
	end
	block.call v
  end
end

#zip_importsObject



26
27
28
29
30
31
32
33
# File 'lib/fairy/node/p-seg-zip.rb', line 26

def zip_imports
  @zip_imports_mutex.synchronize do
	while !@zip_imports
	  @zip_imports_cv.wait(@zip_imports_mutex)
	end
  end
  @zip_imports
end

#zip_inputs=(zinputs) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fairy/node/p-seg-zip.rb', line 37

def zip_inputs=(zinputs)

  policy = @opts[:prequeuing_policy]
  # 仮
  @zip_imports_mutex.synchronize do
	@zip_imports = zinputs.collect{|zinput| 
	  import = Import.new(policy)
	  import.no = zinput.no
	  import.add_key(zinput.key)
	  import.set_log_callback do |n, key| 
 Log::verbose(self, "IMPORT POP key=#{key}: #{n}")
	  end

	  import
	}
	@zip_imports_cv.broadcast
  end
  @zip_imports
end