Class: Fairy::FileMarshaledQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port-marshaled-queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ FileMarshaledQueue

Returns a new instance of FileMarshaledQueue.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/fairy/share/port-marshaled-queue.rb', line 183

def initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond)
  @policy = policy

  @chunk_size = CONF.MARSHAL_QUEUE_CHUNK_SIZE
  @min_chunk_no = @policy[:min_chunk_no]
  @min_chunk_no ||= CONF.MARSHAL_QUEUE_MIN_CHUNK_NO

  @push_queue = []
  @push_queue_mutex = Mutex.new
  
  @buffers_queue = []
  @buffers_queue_mon = queues_mon
  @buffers_queue_cv = queues_cv

  @pop_queue = nil

  @buffer_dir = @policy[:buffer_dir]
  @buffer_dir ||= CONF.TMP_DIR
end

Instance Method Details

#open_2ndmemory(&block) ⇒ Object



309
310
311
312
313
314
315
316
317
318
# File 'lib/fairy/share/port-marshaled-queue.rb', line 309

def open_2ndmemory(&block)
  buffer = FastTempfile.open("port-buffer-", @buffer_dir)
  begin
	yield buffer.io
  ensure
	buffer.close
  end
  @buffers_queue.push buffer
  buffer
end

#popObject



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/fairy/share/port-marshaled-queue.rb', line 255

def pop
  while !@pop_queue || @pop_queue.empty?
	@buffers_queue_mon.synchronize do
	  buf = nil
	  @buffers_queue_cv.wait_until{buf = @buffers_queue.shift}
	  
	  if buf == :END_OF_STREAM
 @pop_queue = [buf]
	  else
 @pop_queue = restore_2ndmemory(buf)
	  end
	end
  end
  e = @pop_queue.shift
  @pop_queue = nil if @pop_queue.empty?
  e
end

#pop_allObject



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/fairy/share/port-marshaled-queue.rb', line 273

def pop_all
  while !@pop_queue
	@buffers_queue_mon.synchronize do
	  buf = nil
	  @buffers_queue_cv.wait_until{buf = @buffers_queue.shift}
	  if buf == :END_OF_STREAM
 @pop_queue = [buf]
	  else
 @pop_queue = restore_2ndmemory(buf)
	  end
	end
  end
  buf, @pop_queue = @pop_queue, nil
  buf
end

#pop_rawObject



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/fairy/share/port-marshaled-queue.rb', line 289

def pop_raw
  if @pop_queue && !@pop_queue.empty?
	ERR::Raise ERR::INTERNAL::MarshalQueueNotEmpty
  end
  
  pop_raw = nil
  while !pop_raw
	@buffers_queue_mon.synchronize do
	  buf = nil
	  @buffers_queue_cv.wait_until{buf = @buffers_queue.shift}
	  if buf == :END_OF_STREAM
 pop_raw = buf
	  else
 pop_raw = restore_raw_2ndmemory(buf)
	  end
	end
  end
  pop_raw
end

#push(e) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/fairy/share/port-marshaled-queue.rb', line 203

def push(e)
  @push_queue_mutex.synchronize do
	@push_queue.push e
	if @push_queue.size >= @min_chunk_no || 
 e == :END_OF_STREAM || 
 e == Import::SET_NO_IMPORT
	  @buffers_queue_mon.synchronize do
 @push_queue.pop if e == :END_OF_STREAM
 store_2ndmemory(@push_queue)
 @buffers_queue.push e if e == :END_OF_STREAM

 @push_queue = []
 @buffers_queue_cv.broadcast
	  end
	end
  end
end

#push_all(buf) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/fairy/share/port-marshaled-queue.rb', line 221

def push_all(buf)
  @push_queue_mutex.synchronize do
	@push_queue.concat buf
	if @push_queue.size > @min_chunk_no || 
 @push_queue.last == :END_OF_STREAM
	  @buffers_queue_mon.synchronize do
 @push_queue.pop if e == :END_OF_STREAM
 store_2ndmemory(@push_queue)
 @buffers_queue.push e if e == :END_OF_STREAM

 @push_queue = []
 @buffers_queue_cv.broadcast
	  end
	end
  end
end

#push_raw(raw) ⇒ Object



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/fairy/share/port-marshaled-queue.rb', line 238

def push_raw(raw)
  @push_queue_mutex.synchronize do
	@buffers_queue_mon.synchronize do
	  unless @push_queue.empty?
 store_2ndmemory(@push_queue)
 @push_queue = []
	  end
	  if raw == :END_OF_STREAM
 @buffers_queue.push raw
	  else
 store_raw_2ndmemory(raw)
	  end
	  @buffers_queue_cv.broadcast
	end
  end
end

#restore_2ndmemory(buf) ⇒ Object



332
333
334
335
336
337
# File 'lib/fairy/share/port-marshaled-queue.rb', line 332

def restore_2ndmemory(buf)
  io = buf.open
  queue = Marshal.load(io)
  buf.close!
  queue
end

#restore_raw_2ndmemory(buf) ⇒ Object



339
340
341
342
343
344
# File 'lib/fairy/share/port-marshaled-queue.rb', line 339

def restore_raw_2ndmemory(buf)
  io = buf.open
  raw = io.read
  buf.close!
  raw
end

#store_2ndmemory(ary) ⇒ Object



320
321
322
323
324
# File 'lib/fairy/share/port-marshaled-queue.rb', line 320

def store_2ndmemory(ary)
  open_2ndmemory do |io|
	Marshal.dump(ary, io)
  end
end

#store_raw_2ndmemory(raw) ⇒ Object



326
327
328
329
330
# File 'lib/fairy/share/port-marshaled-queue.rb', line 326

def store_raw_2ndmemory(raw)
  open_2ndmemory do |io|
	io.write raw
  end
end