Class: Fairy::PGroupBy::PPostFilter

Inherits:
Fairy::PSingleExportFilter show all
Defined in:
lib/fairy/node/p-group-by.rb

Constant Summary

Constants included from Fairy::PSingleExportable

Fairy::PSingleExportable::END_OF_STREAM, Fairy::PSingleExportable::ST_EXPORT_FINISH, Fairy::PSingleExportable::ST_WAIT_EXPORT_FINISH

Constants inherited from Fairy::PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from Fairy::PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes included from Fairy::PSingleExportable

#export

Attributes inherited from Fairy::PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods included from Fairy::PSingleExportable

#start, #start_export, #terminate, #wait_export_finish

Methods inherited from Fairy::PIOFilter

#input=

Methods inherited from Fairy::PFilter

#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opts, block_source) ⇒ PPostFilter

Returns a new instance of PPostFilter.



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fairy/node/p-group-by.rb', line 50

def initialize(id, ntask, bjob, opts, block_source)
	super
	@block_source = block_source

	@buffering_policy = @opts[:buffering_policy]
	@buffering_policy ||= CONF.GROUP_BY_BUFFERING_POLICY

	unless CONF.BUG234
	  @hash_optimize = CONF.GROUP_BY_GROUPING_OPTIMIZE
	  @hash_optimize = opts[:grouping_optimize] if opts.key?(:grouping_optimize)
	end
end

Instance Method Details

#basic_each(&block) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fairy/node/p-group-by.rb', line 97

def basic_each(&block)
	@key_value_buffer = 
	  eval("#{@buffering_policy[:buffering_class]}").new(self, @buffering_policy)
	if @hash_optimize
	  @hash_proc = eval("proc{#{@block_source.source}}")
	else
	  @hash_proc = BBlock.new(@block_source, @context, self)
	end

	@input.each do |e|
	  @key_value_buffer.push(e)
	  e = nil
	end
	@key_value_buffer.each do |kvs|
	  block.call kvs
	end
	@key_value_buffer = nil
end

#basic_each_0(&block) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fairy/node/p-group-by.rb', line 82

def basic_each_0(&block)
#	@key_value_buffer = 
#	  eval("#{@buffering_policy[:buffering_class]}").new(@buffering_policy)
	
	if @hash_optimize
	  @hash_proc = eval("proc{#{@block_source.source}}")
	else
	  @hash_proc = BBlock.new(@block_source, @context, self)
	end

	@input.group_by{|e| e}.each{|k, v|
	  block.call [k, v]
	}
end

#hash_key(e) ⇒ Object



116
117
118
# File 'lib/fairy/node/p-group-by.rb', line 116

def hash_key(e)
	@hash_proc.yield(e)
end