Class: Fairy::SortedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Instance Method Summary collapse

Constructor Details

#initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) ⇒ SortedQueue

Returns a new instance of SortedQueue.



1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
# File 'lib/fairy/share/port.rb', line 1382

def initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond)
  @policy = policy

  @pool_threshold = policy[:pool_threshold]
  @pool_threshold ||= CONF.SORTEDQUEUE_POOL_THRESHOLD
  
  @threshold = policy[:threshold]
  @threshold ||= CONF.SORTEDQUEUE_THRESHOLD

  @push_queue = []
  @pop_queue = nil
  @buffers = nil

  @queue_mon = queue_mon
  @queue_cv = queue_cv

  @sort_by = policy[:sort_by]
  @sort_by ||= CONF.SORTEDQUEUE_SORTBY   

  if @sort_by.kind_of?(String)
	@sort_by = eval("proc{#{@sort_by}}")
  end
end

Instance Method Details

#init_2ndmemoryObject



1476
1477
1478
1479
1480
1481
1482
1483
1484
# File 'lib/fairy/share/port.rb', line 1476

def init_2ndmemory
  require "tempfile"

  @buffer_dir = @policy[:buffer_dir]
  @buffer_dir ||= CONF.TMP_DIR

  @buffers = []
  @merge_io = nil
end

#open_2ndmemory(&block) ⇒ Object



1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
# File 'lib/fairy/share/port.rb', line 1486

def open_2ndmemory(&block)
  unless @buffers
	init_2ndmemory
  end
  buffer = Tempfile.open("port-buffer-", @buffer_dir)
  begin
	# ruby BUG#2390の対応のため.
#	yield buffer
	yield buffer.instance_eval{@tmpfile}
  ensure
	buffer.close
  end
  @buffers.push buffer
  buffer
end

#popObject



1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
# File 'lib/fairy/share/port.rb', line 1432

def pop
  @queue_mon.synchronize do
	@queue_cv.wait_while{@pop_queue.nil?}

	if @buffers.nil?
#Log::debug(self, @pop_queue.inspect)
	  return @pop_queue.shift
	else
	  pop_2ndmemory
	end
  end
end

#pop_2ndmemoryObject



1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
# File 'lib/fairy/share/port.rb', line 1514

def pop_2ndmemory
  unless @merge_io
	@buffers.each{|tf| tf.open}
	@merge_io = @buffers.map{|io| 
	  e = nil
	  begin
 e = Marshal.load(io)
	  rescue EOFError
 io.close!
	  end
	  [io, e]}.select{|io, v| !v.nil?}.sort_by{|io, v| @sort_by.call(v)}
  end
  unless io_min = @merge_io.shift
	return :END_OF_STREAM
  end
  
  io, min = io_min
  begin
	e = Marshal.load(io)
	@merge_io.push [io, e] 
	@merge_io = @merge_io.sort_by{|io, e| @sort_by.call(e)}
  rescue EOFError
	io.close!
  end
  min
end

#pop_allObject



1445
1446
1447
1448
1449
1450
1451
1452
# File 'lib/fairy/share/port.rb', line 1445

def pop_all
  buf = []
  while e = pop
	buf.push e
	return buf if buf.size > @pool_threshold
  end
  buf
end

#push(e) ⇒ Object



1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
# File 'lib/fairy/share/port.rb', line 1406

def push(e)
  @queue_mon.synchronize do
	@push_queue.push e
	if e == :END_OF_STREAM
	  @push_queue.pop
	  if @buffers
 store_2ndmemory(@push_queue)
 @push_queue = []
 @pop_queue = []
	  else
 begin
   @pop_queue = @push_queue.sort_by{|e| @sort_by.call(e)}
   @pop_queue.push :END_OF_STREAM
 rescue
   Log::debug_exception
 end
	  end
	  @queue_cv.broadcast
	end
	if @push_queue.size >= @threshold
	  store_2ndmemory(@push_queue)
	  @push_queue = []
	end
  end
end

#store_2ndmemory(ary) ⇒ Object



1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
# File 'lib/fairy/share/port.rb', line 1502

def store_2ndmemory(ary)
  Log::debug(self, "start store: ")
  open_2ndmemory do |io|
	ary = ary.sort_by{|e| @sort_by.call(e)}
	while !ary.empty?
	  e = ary.shift
	  Marshal.dump(e, io)
	end
  end
  Log::debug(self, "end store")
end