Class: Fairy::ChunkedPoolQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Direct Known Subclasses

ChunkedSizedPoolQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ ChunkedPoolQueue

multi push threads single pop thread



1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
# File 'lib/fairy/share/port.rb', line 1010

def initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond)
  @policy = policy

  @queue_threshold = CONF.POOLQUEUE_POOL_THRESHOLD
  @queue_max = CONF.POSTQUEUE_MAX_TRANSFER_SIZE

  @push_queue = []
  @push_queue_mutex = Mutex.new
  
  @queues = []
  @queues_mon = queues_mon
  @queues_cv = queues_cv

  @pop_queue = nil
#      @pop_queue_mutex = Mutex.new
end

Instance Attribute Details

#fib_cvObject

Returns the value of attribute fib_cv.



1027
1028
1029
# File 'lib/fairy/share/port.rb', line 1027

def fib_cv
  @fib_cv
end

Instance Method Details

#popObject



1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
# File 'lib/fairy/share/port.rb', line 1058

def pop
#      @pop_queue.synchronize do
  while !@pop_queue || @pop_queue.empty?
	@queues_mon.synchronize do
	  @queues_cv.wait_until{@pop_queue = @queues.shift}
	end
  end
  e = @pop_queue.shift
  @pop_queue = nil if @pop_queue.empty?
  e
end

#pop_allObject



1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
# File 'lib/fairy/share/port.rb', line 1070

def pop_all
#      @pop_queue.synchronize do
  while !@pop_queue
	@queues_mon.synchronize do
	  @queues_cv.wait_until{@pop_queue = @queues.shift}
	end
  end
  buf, @pop_queue = @pop_queue, nil
  buf
#      end
end

#push(e) ⇒ Object



1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
# File 'lib/fairy/share/port.rb', line 1029

def push(e)
  @push_queue_mutex.synchronize do
	@push_queue.push e
	if @push_queue.size >= @queue_threshold || 
 e == :END_OF_STREAM || 
 e == Import::SET_NO_IMPORT
	  @queues_mon.synchronize do
 @queues.push @push_queue 
 @push_queue = []
 @queues_cv.broadcast
	  end
	end
  end
end

#push_all(buf) ⇒ Object



1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
# File 'lib/fairy/share/port.rb', line 1044

def push_all(buf)
  @push_queue_mutex.synchronize do
	@push_queue.concat buf
	if @push_queue.size > @queue_threshold || 
 @push_queue.last == :END_OF_STREAM
	  @queues_mon.synchronize do
 @queues.push @push_queue
 @push_queue = []
 @queues_cv.broadcast
	  end
	end
  end
end