Class: Fairy::SizedPoolQueue
- Defined in:
- lib/fairy/share/port.rb
Instance Method Summary collapse
-
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queues_mon.new_cond) ⇒ SizedPoolQueue
constructor
A new instance of SizedPoolQueue.
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #push_all(buf) ⇒ Object
Constructor Details
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queues_mon.new_cond) ⇒ SizedPoolQueue
Returns a new instance of SizedPoolQueue.
973 974 975 976 977 978 979 980 |
# File 'lib/fairy/share/port.rb', line 973 def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queues_mon.new_cond) super @max_size = policy[:size] @max_size ||= CONF.ONMEMORY_SIZEDQUEUE_SIZE @pop_cv = @queue_cv @push_cv = @queue_mon.new_cond end |
Instance Method Details
#pop ⇒ Object
1004 1005 1006 1007 1008 |
# File 'lib/fairy/share/port.rb', line 1004 def pop e = super @push_cv.broadcast e end |
#pop_all ⇒ Object
1010 1011 1012 1013 1014 |
# File 'lib/fairy/share/port.rb', line 1010 def pop_all buf = super @push_cv.broadcast buf end |
#push(e) ⇒ Object
982 983 984 985 986 987 988 989 990 991 992 |
# File 'lib/fairy/share/port.rb', line 982 def push(e) @queue_mon.synchronize do @push_cv.wait_while{@queue.size > @max_size} @queue.push e if @queue.size >= @queue_threshold || e == :END_OF_STREAM || e == Import::SET_NO_IMPORT @pop_cv.broadcast end end end |
#push_all(buf) ⇒ Object
994 995 996 997 998 999 1000 1001 1002 |
# File 'lib/fairy/share/port.rb', line 994 def push_all(buf) @queue_mon.synchronize do @push_cv.wait_while{@queue.size > @max_size} @queue.concat buf if @queue.size >= @queue_threshold || @queue.last == :END_OF_STREAM @pop_cv.broadcast end end end |