Class: Fairy::ChunkedPoolQueue
- Inherits:
-
Object
- Object
- Fairy::ChunkedPoolQueue
- Defined in:
- lib/fairy/share/port.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#fib_cv ⇒ Object
Returns the value of attribute fib_cv.
Instance Method Summary collapse
-
#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ ChunkedPoolQueue
constructor
multi push threads single pop thread.
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #push_all(buf) ⇒ Object
Constructor Details
#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ ChunkedPoolQueue
multi push threads single pop thread
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 |
# File 'lib/fairy/share/port.rb', line 1010 def initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) @policy = policy @queue_threshold = CONF.POOLQUEUE_POOL_THRESHOLD @queue_max = CONF.POSTQUEUE_MAX_TRANSFER_SIZE @push_queue = [] @push_queue_mutex = Mutex.new @queues = [] @queues_mon = queues_mon @queues_cv = queues_cv @pop_queue = nil # @pop_queue_mutex = Mutex.new end |
Instance Attribute Details
#fib_cv ⇒ Object
Returns the value of attribute fib_cv.
1027 1028 1029 |
# File 'lib/fairy/share/port.rb', line 1027 def fib_cv @fib_cv end |
Instance Method Details
#pop ⇒ Object
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 |
# File 'lib/fairy/share/port.rb', line 1058 def pop # @pop_queue.synchronize do while !@pop_queue || @pop_queue.empty? @queues_mon.synchronize do @queues_cv.wait_until{@pop_queue = @queues.shift} end end e = @pop_queue.shift @pop_queue = nil if @pop_queue.empty? e end |
#pop_all ⇒ Object
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 |
# File 'lib/fairy/share/port.rb', line 1070 def pop_all # @pop_queue.synchronize do while !@pop_queue @queues_mon.synchronize do @queues_cv.wait_until{@pop_queue = @queues.shift} end end buf, @pop_queue = @pop_queue, nil buf # end end |
#push(e) ⇒ Object
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 |
# File 'lib/fairy/share/port.rb', line 1029 def push(e) @push_queue_mutex.synchronize do @push_queue.push e if @push_queue.size >= @queue_threshold || e == :END_OF_STREAM || e == Import::SET_NO_IMPORT @queues_mon.synchronize do @queues.push @push_queue @push_queue = [] @queues_cv.broadcast end end end end |
#push_all(buf) ⇒ Object
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 |
# File 'lib/fairy/share/port.rb', line 1044 def push_all(buf) @push_queue_mutex.synchronize do @push_queue.concat buf if @push_queue.size > @queue_threshold || @push_queue.last == :END_OF_STREAM @queues_mon.synchronize do @queues.push @push_queue @push_queue = [] @queues_cv.broadcast end end end end |