Class: Fairy::SizedPoolQueue

Inherits:
PoolQueue show all
Defined in:
lib/fairy/share/port.rb

Instance Method Summary collapse

Constructor Details

#initialize(policy, queue_mon = Monitor.new, queue_cv = queues_mon.new_cond) ⇒ SizedPoolQueue

Returns a new instance of SizedPoolQueue.



963
964
965
966
967
968
969
970
# File 'lib/fairy/share/port.rb', line 963

def initialize(policy, queue_mon = Monitor.new, queue_cv = queues_mon.new_cond)
  super
  @max_size = policy[:size]
  @max_size ||= CONF.ONMEMORY_SIZEDQUEUE_SIZE

  @pop_cv = @queue_cv
  @push_cv = @queue_mon.new_cond
end

Instance Method Details

#popObject



994
995
996
997
998
# File 'lib/fairy/share/port.rb', line 994

def pop
  e = super
  @push_cv.broadcast
  e
end

#pop_allObject



1000
1001
1002
1003
1004
# File 'lib/fairy/share/port.rb', line 1000

def pop_all
  buf = super
  @push_cv.broadcast
  buf
end

#push(e) ⇒ Object



972
973
974
975
976
977
978
979
980
981
982
# File 'lib/fairy/share/port.rb', line 972

def push(e)
  @queue_mon.synchronize do
	@push_cv.wait_while{@queue.size > @max_size}
	@queue.push e
	if @queue.size >= @queue_threshold || 
 e == :END_OF_STREAM || 
 e == Import::SET_NO_IMPORT
	  @pop_cv.broadcast
	end
  end
end

#push_all(buf) ⇒ Object



984
985
986
987
988
989
990
991
992
# File 'lib/fairy/share/port.rb', line 984

def push_all(buf)
  @queue_mon.synchronize do
	@push_cv.wait_while{@queue.size > @max_size}
	@queue.concat buf
	if @queue.size >= @queue_threshold || @queue.last == :END_OF_STREAM
	  @pop_cv.broadcast
	end
  end
end