Class: Fairy::PFilter

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/node/p-filter.rb

Direct Known Subclasses

PIOFilter, PThere

Defined Under Namespace

Classes: Context

Constant Summary collapse

END_OF_STREAM =
:END_OF_STREAM
ST_INIT =
:ST_INIT
ST_ACTIVATE =
:ST_ACTIVATE
ST_FINISH =
:ST_FINISH

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, ntask, bjob, opts = {}, *rests) ⇒ PFilter

Returns a new instance of PFilter.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fairy/node/p-filter.rb', line 26

def initialize(id, ntask, bjob, opts={}, *rests)
  @id = id
  @log_id = format("%s[%s]", self.class.name.sub(/Fairy::/, ''), @id)

  Log::info self, "CREATE NJOB: #{self.class}"
  @ntask = ntask
  @bjob = bjob
  @opts = opts

  @IGNORE_EXCEPTION = CONF.IGNORE_EXCEPTION_ON_FILTER
  @IGNORE_EXCEPTION = @opts[:ignore_exception] if @opts.key?(:ignore_exception)

  @main_thread = nil

  @context = Context.new(self)
#       @begin_block = nil
#       if @opts[:BEGIN]
# 	@begin_block = BBlock.new(@opts[:BEGIN], @context, self)
#       end
#       @end_block = nil
#       if @opts[:END]
# 	@end_block = BBlock.new(@opts[:END], @context, self)
#       end

  @begin_block_source = nil
  if @opts[:BEGIN]
	@begin_block_source = @opts[:BEGIN]
	@begin_block_exec_p = false
  end
  @end_block_source = nil
  if @opts[:END]
	@end_block_source = @opts[:END]
  end

  @no = nil
  @no_mutex = Mutex.new
  @no_cv = ConditionVariable.new

  @key = nil
  @key_mutex = Mutex.new
  @key_cv = ConditionVariable.new

  @status = ST_INIT
  @status_mon = processor.njob_mon
  @status_cv = @status_mon.new_cv
  notice_status(@status)

  @terminate_mon = processor.njob_mon

  @in_each = nil
  @in_each_mutex = Mutex.new

#      start_watch_status
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



81
82
83
# File 'lib/fairy/node/p-filter.rb', line 81

def id
  @id
end

#IGNORE_EXCEPTIONObject (readonly)

Returns the value of attribute IGNORE_EXCEPTION.



84
85
86
# File 'lib/fairy/node/p-filter.rb', line 84

def IGNORE_EXCEPTION
  @IGNORE_EXCEPTION
end

#log_idObject (readonly)

Returns the value of attribute log_id.



82
83
84
# File 'lib/fairy/node/p-filter.rb', line 82

def log_id
  @log_id
end

#ntaskObject (readonly)

Returns the value of attribute ntask.



86
87
88
# File 'lib/fairy/node/p-filter.rb', line 86

def ntask
  @ntask
end

Instance Method Details

#abort_runningObject



269
270
271
# File 'lib/fairy/node/p-filter.rb', line 269

def abort_running
  @main_thread.exit if @main_thread
end

#basic_start(&block) ⇒ Object



174
175
176
# File 'lib/fairy/node/p-filter.rb', line 174

def basic_start(&block)
  block.call
end

#break_runningObject



260
261
262
263
264
265
266
267
# File 'lib/fairy/node/p-filter.rb', line 260

def break_running
  if @in_each
	@main_thread.raise @context.class::GlobalBreakFromOther
#      @main_thread.exit if @main_thread
	self.status = ST_FINISH
  end
  # 他のスレッドはとめていない
end

#each(&block) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/fairy/node/p-filter.rb', line 178

def each(&block)
  begin
	@in_each = true

	if @begin_block_source
	  bsource = BSource.new(@begin_block_source, @context, self)
	  bsource.evaluate
	  @begin_block_exec_p = true
	end
	begin
	  basic_each do |e|
 case e
 when Import::CTLTOKEN_NULLVALUE
   next
 else
   begin
		block.call e
   rescue
		if @IGNORE_EXCEPTION
Log::warn(self, "IGNORE_EXCEPTON!!")
Log::warn(self, "Entity: #{e.inspect}")
Log::error_exception(self)
next
		else
raise
		end
   end
 end
	  end
	ensure
	  if @end_block_source
 bsource = BSource.new(@end_block_source, @context, self)
 bsource.evaluate
	  end
	end
  rescue @context.class::GlobalBreakFromOther
	Log::debug(self, "CAUGHT GlobalBreak From Other")
	global_break_from_other
	
  rescue LocalJumpError, @context.class::GlobalBreak
	Log::debug(self, "CAUGHT GlobalBreak")
	global_break

  rescue Exception
	Log::error_exception(self)
	handle_exception($!)
	raise
  ensure
	@in_each = false
  end
  nil
end

#global_breakObject



249
250
251
252
253
254
# File 'lib/fairy/node/p-filter.rb', line 249

def global_break
  Thread.start{@bjob.break_running(self)}
#      Thread.current.exit
  self.status = ST_FINISH
  # 他のスレッドはとめていない
end

#global_break_from_otherObject



256
257
258
# File 'lib/fairy/node/p-filter.rb', line 256

def global_break_from_other
  self.status = ST_FINISH
end

#handle_exception(exp) ⇒ Object

binding = eval(“def fairy_binding; binding; end; fairy_binding”, TOPLEVEL_BINDING, __FILE__, __LINE__ - 3)

end


320
321
322
# File 'lib/fairy/node/p-filter.rb', line 320

def handle_exception(exp)
  @bjob.handle_exception(exp)
end

#keyObject



110
111
112
113
114
115
116
117
118
# File 'lib/fairy/node/p-filter.rb', line 110

def key
#       @key_mutex.synchronize do
# 	while !@key
# 	  @key_cv.wait(@key_mutex)
# 	end
# 	@key
#       end
  @key
end

#key=(key) ⇒ Object



120
121
122
123
124
125
126
127
# File 'lib/fairy/node/p-filter.rb', line 120

def key=(key)
#       @key_mutex.synchronize do
# 	@key = key
# 	@key_cv.broadcast
# 	@key
#       end
  @key=key
end

#nextObject



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/fairy/node/p-filter.rb', line 231

def next
  if @begin_block_source && @begin_block_exec_p
	bsource = BSource.new(@begin_block_source, @context, self)
	bsource.evaluate
	@begin_block_exec_p = true
  end
  begin
	ret = basic_next
  ensure
	if ret == :END_OF_STREAM
	  if @end_block_source
 bsource = BSource.new(@end_block_source, @context, self)
 bsource.evaluate
	  end
	end
  end
end

#noObject



91
92
93
94
95
96
97
98
99
100
# File 'lib/fairy/node/p-filter.rb', line 91

def no
#Log::debug(self, "XXXXXXXXXXXXXXXXXXXXXXXX")
#Log::debug_backtrace
  @no_mutex.synchronize do
	while !@no
	  @no_cv.wait(@no_mutex)
	end
	@no
  end
end

#no=(no) ⇒ Object



102
103
104
105
106
107
108
# File 'lib/fairy/node/p-filter.rb', line 102

def no=(no)
  @no_mutex.synchronize do
	@no = no
	@no_cv.broadcast
	@no
  end
end

#notice_status(st) ⇒ Object



299
300
301
302
303
304
# File 'lib/fairy/node/p-filter.rb', line 299

def notice_status(st)
  @status_mon.entry do
	@bjob.update_status(self, st)
	@ntask.update_status(self, st)
  end
end

#processorObject



87
88
89
# File 'lib/fairy/node/p-filter.rb', line 87

def processor
  @ntask.processor
end

#start(&block) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fairy/node/p-filter.rb', line 133

def start(&block)
  Log::info self, "START PROCESSING: #{self.class}"

  start_watch_status

  @main_thread = Thread.start{
	begin
	  self.status = ST_ACTIVATE
	  if @begin_block_source
 bsource = BSource.new(@begin_block_source, @context, self)
 bsource.evaluate
	  end
	  begin
 basic_start &block
	  ensure
 if @end_block_source
   bsource = BSource.new(@end_block_source, @context, self)
   bsource.evaluate
 end

 @main_thread = nil
 @terminate_mon.entry terminate_proc
 Log::info self, "FINISH PROCESSING: #{self.class}"
	  end
	rescue Exception
	  Log::error_exception(self)
	  handle_exception($!)
	  raise
	end
  }
  nil
end

#start_exportObject



129
130
131
# File 'lib/fairy/node/p-filter.rb', line 129

def start_export
  ERR::Raise ERR::INTERNAL::ShouldDefineSubclass
end

#start_watch_statusObject



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/fairy/node/p-filter.rb', line 281

def start_watch_status
  # 初期状態通知
  notice_status(@status)

  @status_mon.entry do
	@status_mon.synchronize do
	  old_status = nil
	  loop do
 @status_cv.wait_while{old_status == @status}
 old_status = @status
 notice_status(@status)
 break if @status == ST_FINISH
	  end
	end
  end
  nil
end

#status=(val) ⇒ Object



273
274
275
276
277
278
279
# File 'lib/fairy/node/p-filter.rb', line 273

def status=(val)
#Log::debugf(self, "STATUS_CHANGE: %s", val)
  @status_mon.synchronize do
	@status = val
	@status_cv.broadcast
  end
end

#terminateObject



170
171
172
# File 'lib/fairy/node/p-filter.rb', line 170

def terminate
  self.status = ST_FINISH
end

#terminate_procObject



166
167
168
# File 'lib/fairy/node/p-filter.rb', line 166

def terminate_proc
  proc{|*dummy| terminate}
end