Class: Fairy::PoolQueue
- Inherits:
-
Object
- Object
- Fairy::PoolQueue
- Defined in:
- lib/fairy/share/port.rb
Direct Known Subclasses
Instance Method Summary collapse
-
#initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) ⇒ PoolQueue
constructor
A new instance of PoolQueue.
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #push_all(buf) ⇒ Object
Constructor Details
#initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) ⇒ PoolQueue
Returns a new instance of PoolQueue.
896 897 898 899 900 901 902 903 904 |
# File 'lib/fairy/share/port.rb', line 896 def initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) @policy = policy @queue_threshold = CONF.POOLQUEUE_POOL_THRESHOLD @queue = [] @queue_mon = queue_mon @queue_cv = queue_cv end |
Instance Method Details
#pop ⇒ Object
926 927 928 929 930 931 |
# File 'lib/fairy/share/port.rb', line 926 def pop @queue_mon.synchronize do @queue_cv.wait_while{@queue.empty?} @queue.shift end end |
#pop_all ⇒ Object
933 934 935 936 937 938 939 940 941 |
# File 'lib/fairy/share/port.rb', line 933 def pop_all @queue_mon.synchronize do @queue_cv.wait_while{@queue.size < @queue_threshold && @queue.last != :END_OF_STREAM} # buf = @queue.dup # @queue.clear buf, @queue = @queue, [] buf end end |
#push(e) ⇒ Object
906 907 908 909 910 911 912 913 914 915 |
# File 'lib/fairy/share/port.rb', line 906 def push(e) @queue_mon.synchronize do @queue.push e if @queue.size >= @queue_threshold || e == :END_OF_STREAM || e == Import::SET_NO_IMPORT @queue_cv.broadcast end end end |
#push_all(buf) ⇒ Object
917 918 919 920 921 922 923 924 |
# File 'lib/fairy/share/port.rb', line 917 def push_all(buf) @queue_mon.synchronize do @queue.concat buf if @queue.size >= @queue_threshold || @queue.last == :END_OF_STREAM @queue_cv.broadcast end end end |