Class: Hopper
- Inherits:
-
Object
- Object
- Hopper
- Defined in:
- lib/hopper.rb,
lib/hopper/version.rb
Constant Summary collapse
- VERSION =
"0.0.1"
Instance Method Summary collapse
- #<<(item) ⇒ Object
-
#initialize(capacity: 10, timeout: 2, always: false, exact: true, &block) ⇒ Hopper
constructor
A new instance of Hopper.
- #size ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(capacity: 10, timeout: 2, always: false, exact: true, &block) ⇒ Hopper
Returns a new instance of Hopper.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/hopper.rb', line 6 def initialize(capacity: 10, timeout: 2, always: false, exact: true, &block) @capacity = capacity @timeout = timeout @always = always @exact = exact @running = false @condvar = ConditionVariable.new @hopper_mutex = Mutex.new @signal_mutex = Mutex.new @hopper = Array.new @block = block @thread = nil start end |
Instance Method Details
#<<(item) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/hopper.rb', line 62 def <<(item) if !@running raise "Hopper is not running." end @hopper_mutex.synchronize { @hopper << item if @hopper.size >= @capacity then @condvar.signal @condvar.wait(@hopper_mutex) if @exact end } end |
#size ⇒ Object
56 57 58 59 60 |
# File 'lib/hopper.rb', line 56 def size @hopper_mutex.synchronize { return @hopper.size } end |
#start ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/hopper.rb', line 23 def start @running = true @thread = Thread.new do while @running timed_out = false # wait for signal or timeout @signal_mutex.synchronize { if @condvar.wait(@signal_mutex, @timeout) then timed_out = false else timed_out = true end } # yield passed block @hopper_mutex.synchronize { if @always || @hopper.size > 0 @block.call @hopper, timed_out if @block end @hopper = Array.new @condvar.signal if @exact } end end end |
#stop ⇒ Object
51 52 53 54 |
# File 'lib/hopper.rb', line 51 def stop @running = false @thread.join end |