Class: Fairy::PFilter

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/node/p-filter.rb

Direct Known Subclasses

PIOFilter, PThere

Defined Under Namespace

Classes: Context

Constant Summary collapse

END_OF_STREAM =
:END_OF_STREAM
ST_INIT =
:ST_INIT
ST_ACTIVATE =
:ST_ACTIVATE
ST_FINISH =
:ST_FINISH

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, ntask, bjob, opts = {}, *rests) ⇒ PFilter

Returns a new instance of PFilter.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fairy/node/p-filter.rb', line 27

def initialize(id, ntask, bjob, opts={}, *rests)
  @id = id
  @log_id = format("%s[%s]", self.class.name.sub(/Fairy::/, ''), @id)

  Log::info self, "CREATE NJOB: #{self.class}"
  @ntask = ntask
  @bjob = bjob
  @opts = opts

  @IGNORE_EXCEPTION = CONF.IGNORE_EXCEPTION_ON_FILTER
  @IGNORE_EXCEPTION = @opts[:ignore_exception] if @opts.key?(:ignore_exception)

  @main_thread = nil

  @context = Context.new(self)
#       @begin_block = nil
#       if @opts[:BEGIN]
# 	@begin_block = BBlock.new(@opts[:BEGIN], @context, self)
#       end
#       @end_block = nil
#       if @opts[:END]
# 	@end_block = BBlock.new(@opts[:END], @context, self)
#       end

  @begin_block_source = nil
  if @opts[:BEGIN]
	@begin_block_source = @opts[:BEGIN]
	@begin_block_exec_p = false
  end
  @end_block_source = nil
  if @opts[:END]
	@end_block_source = @opts[:END]
  end

  @no = nil
  @no_mutex = Mutex.new
  @no_cv = XThread::ConditionVariable.new

  @key = nil
  @key_mutex = Mutex.new
  @key_cv = XThread::ConditionVariable.new

  @status = ST_INIT
  @status_mon = processor.njob_mon.new_mon
  @status_cv = @status_mon.new_cv
  notice_status(@status)

  @terminate_mon = processor.njob_mon.new_mon

  @in_each = nil
  @in_each_mutex = Mutex.new

#      start_watch_status
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



82
83
84
# File 'lib/fairy/node/p-filter.rb', line 82

def id
  @id
end

#IGNORE_EXCEPTIONObject (readonly)

Returns the value of attribute IGNORE_EXCEPTION.



85
86
87
# File 'lib/fairy/node/p-filter.rb', line 85

def IGNORE_EXCEPTION
  @IGNORE_EXCEPTION
end

#log_idObject (readonly)

Returns the value of attribute log_id.



83
84
85
# File 'lib/fairy/node/p-filter.rb', line 83

def log_id
  @log_id
end

#ntaskObject (readonly)

Returns the value of attribute ntask.



87
88
89
# File 'lib/fairy/node/p-filter.rb', line 87

def ntask
  @ntask
end

Instance Method Details

#abort_runningObject



270
271
272
# File 'lib/fairy/node/p-filter.rb', line 270

def abort_running
  @main_thread.exit if @main_thread
end

#basic_start(&block) ⇒ Object



175
176
177
# File 'lib/fairy/node/p-filter.rb', line 175

def basic_start(&block)
  block.call
end

#break_runningObject



261
262
263
264
265
266
267
268
# File 'lib/fairy/node/p-filter.rb', line 261

def break_running
  if @in_each
	@main_thread.raise @context.class::GlobalBreakFromOther
#      @main_thread.exit if @main_thread
	self.status = ST_FINISH
  end
  # 他のスレッドはとめていない
end

#each(&block) ⇒ Object



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/fairy/node/p-filter.rb', line 179

def each(&block)
  begin
	@in_each = true

	if @begin_block_source
	  bsource = BSource.new(@begin_block_source, @context, self)
	  bsource.evaluate
	  @begin_block_exec_p = true
	end
	begin
	  basic_each do |e|
 case e
 when Import::CTLTOKEN_NULLVALUE
   next
 else
   begin
		block.call e
   rescue
		if @IGNORE_EXCEPTION
Log::warn(self, "IGNORE_EXCEPTON!!")
Log::warn(self, "Entity: #{e.inspect}")
Log::error_exception(self)
next
		else
raise
		end
   end
 end
	  end
	ensure
	  if @end_block_source
 bsource = BSource.new(@end_block_source, @context, self)
 bsource.evaluate
	  end
	end
  rescue @context.class::GlobalBreakFromOther
	Log::debug(self, "CAUGHT GlobalBreak From Other")
	global_break_from_other
	
  rescue LocalJumpError, @context.class::GlobalBreak
	Log::debug(self, "CAUGHT GlobalBreak")
	global_break

  rescue Exception
	Log::error_exception(self)
	handle_exception($!)
	raise
  ensure
	@in_each = false
  end
  nil
end

#global_breakObject



250
251
252
253
254
255
# File 'lib/fairy/node/p-filter.rb', line 250

def global_break
  Thread.start{@bjob.break_running(self)}
#      Thread.current.exit
  self.status = ST_FINISH
  # 他のスレッドはとめていない
end

#global_break_from_otherObject



257
258
259
# File 'lib/fairy/node/p-filter.rb', line 257

def global_break_from_other
  self.status = ST_FINISH
end

#handle_exception(exp) ⇒ Object

binding = eval(“def fairy_binding; binding; end; fairy_binding”, TOPLEVEL_BINDING, __FILE__, __LINE__ - 3)

end


321
322
323
# File 'lib/fairy/node/p-filter.rb', line 321

def handle_exception(exp)
  @bjob.handle_exception(exp)
end

#keyObject



111
112
113
114
115
116
117
118
119
# File 'lib/fairy/node/p-filter.rb', line 111

def key
#       @key_mutex.synchronize do
# 	while !@key
# 	  @key_cv.wait(@key_mutex)
# 	end
# 	@key
#       end
  @key
end

#key=(key) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/fairy/node/p-filter.rb', line 121

def key=(key)
#       @key_mutex.synchronize do
# 	@key = key
# 	@key_cv.broadcast
# 	@key
#       end
  @key=key
end

#nextObject



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/fairy/node/p-filter.rb', line 232

def next
  if @begin_block_source && @begin_block_exec_p
	bsource = BSource.new(@begin_block_source, @context, self)
	bsource.evaluate
	@begin_block_exec_p = true
  end
  begin
	ret = basic_next
  ensure
	if ret == :END_OF_STREAM
	  if @end_block_source
 bsource = BSource.new(@end_block_source, @context, self)
 bsource.evaluate
	  end
	end
  end
end

#noObject



92
93
94
95
96
97
98
99
100
101
# File 'lib/fairy/node/p-filter.rb', line 92

def no
#Log::debug(self, "XXXXXXXXXXXXXXXXXXXXXXXX")
#Log::debug_backtrace
  @no_mutex.synchronize do
	while !@no
	  @no_cv.wait(@no_mutex)
	end
	@no
  end
end

#no=(no) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/fairy/node/p-filter.rb', line 103

def no=(no)
  @no_mutex.synchronize do
	@no = no
	@no_cv.broadcast
	@no
  end
end

#notice_status(st) ⇒ Object



300
301
302
303
304
305
# File 'lib/fairy/node/p-filter.rb', line 300

def notice_status(st)
  processor.njob_mon.entry do
	@bjob.update_status(self, st)
	@ntask.update_status(self, st)
  end
end

#processorObject



88
89
90
# File 'lib/fairy/node/p-filter.rb', line 88

def processor
  @ntask.processor
end

#start(&block) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fairy/node/p-filter.rb', line 134

def start(&block)
  Log::info self, "START PROCESSING: #{self.class}"

  start_watch_status

  @main_thread = Thread.start{
	begin
	  self.status = ST_ACTIVATE
	  if @begin_block_source
 bsource = BSource.new(@begin_block_source, @context, self)
 bsource.evaluate
	  end
	  begin
 basic_start &block
	  ensure
 if @end_block_source
   bsource = BSource.new(@end_block_source, @context, self)
   bsource.evaluate
 end

 @main_thread = nil
 processor.njob_mon.entry terminate_proc
 Log::info self, "FINISH PROCESSING: #{self.class}"
	  end
	rescue Exception
	  Log::error_exception(self)
	  handle_exception($!)
	  raise
	end
  }
  nil
end

#start_exportObject



130
131
132
# File 'lib/fairy/node/p-filter.rb', line 130

def start_export
  ERR::Raise ERR::INTERNAL::ShouldDefineSubclass
end

#start_watch_statusObject



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/fairy/node/p-filter.rb', line 282

def start_watch_status
  # 初期状態通知
  notice_status(@status)

  processor.njob_mon.entry do
	@status_mon.synchronize do
	  old_status = nil
	  loop do
 @status_cv.wait_while{old_status == @status}
 old_status = @status
 notice_status(@status)
 break if @status == ST_FINISH
	  end
	end
  end
  nil
end

#status=(val) ⇒ Object



274
275
276
277
278
279
280
# File 'lib/fairy/node/p-filter.rb', line 274

def status=(val)
#Log::debugf(self, "STATUS_CHANGE: %s", val)
  @status_mon.synchronize do
	@status = val
	@status_cv.broadcast
  end
end

#terminateObject



171
172
173
# File 'lib/fairy/node/p-filter.rb', line 171

def terminate
  self.status = ST_FINISH
end

#terminate_procObject



167
168
169
# File 'lib/fairy/node/p-filter.rb', line 167

def terminate_proc
  proc{|*dummy| terminate}
end