Class: Fairy::PoolQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Direct Known Subclasses

SizedPoolQueue

Instance Method Summary collapse

Constructor Details

#initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) ⇒ PoolQueue

Returns a new instance of PoolQueue.



896
897
898
899
900
901
902
903
904
# File 'lib/fairy/share/port.rb', line 896

def initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond)
  @policy = policy

  @queue_threshold = CONF.POOLQUEUE_POOL_THRESHOLD

  @queue = []
  @queue_mon = queue_mon
  @queue_cv = queue_cv
end

Instance Method Details

#popObject



926
927
928
929
930
931
# File 'lib/fairy/share/port.rb', line 926

def pop
  @queue_mon.synchronize do
	@queue_cv.wait_while{@queue.empty?}
	@queue.shift
  end
end

#pop_allObject



933
934
935
936
937
938
939
940
941
# File 'lib/fairy/share/port.rb', line 933

def pop_all
  @queue_mon.synchronize do
	@queue_cv.wait_while{@queue.size < @queue_threshold && @queue.last != :END_OF_STREAM}
#	buf = @queue.dup
#	@queue.clear
	buf, @queue = @queue, []
	buf
  end
end

#push(e) ⇒ Object



906
907
908
909
910
911
912
913
914
915
# File 'lib/fairy/share/port.rb', line 906

def push(e)
  @queue_mon.synchronize do
	@queue.push e
	if @queue.size >= @queue_threshold || 
 e == :END_OF_STREAM || 
 e == Import::SET_NO_IMPORT
	  @queue_cv.broadcast
	end
  end
end

#push_all(buf) ⇒ Object



917
918
919
920
921
922
923
924
# File 'lib/fairy/share/port.rb', line 917

def push_all(buf)
  @queue_mon.synchronize do
	@queue.concat buf
	if @queue.size >= @queue_threshold || @queue.last == :END_OF_STREAM
	  @queue_cv.broadcast
	end
  end
end