Class: GoChanel::Task
- Inherits:
-
Object
- Object
- GoChanel::Task
- Defined in:
- lib/go_chanel/task.rb
Class Method Summary collapse
- .end_task(in_chan, concurrence_count, &proc) ⇒ Object
- .normal_task(in_chan, out_chan, concurrence_count = 1, &proc) ⇒ Object
- .start_task(out_chan, *args, &proc) ⇒ Object
Class Method Details
.end_task(in_chan, concurrence_count, &proc) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/go_chanel/task.rb', line 45 def self.end_task(in_chan, concurrence_count, &proc) concurrence_chan = GoChanel::Chanel.new(concurrence_count) while true do args, ok = in_chan.pop break unless ok concurrence_chan.push(true) Thread.new(args) do |params| begin proc.call(*params) rescue => e GoChanel::ErrNotifier.notify(e) ensure concurrence_chan.pop end end end while concurrence_chan.size > 0 do sleep(0.1) end end |
.normal_task(in_chan, out_chan, concurrence_count = 1, &proc) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/go_chanel/task.rb', line 4 def self.normal_task(in_chan, out_chan, concurrence_count = 1, &proc) concurrence_chan = GoChanel::Chanel.new(concurrence_count) while true do args, ok = in_chan.pop break unless ok concurrence_chan.push(true) Thread.new(args) do |params| begin result = proc.call(*params) out_chan.push(result) rescue => e GoChanel::ErrNotifier.notify(e) ensure concurrence_chan.pop end end end while concurrence_chan.size > 0 do sleep(0.1) end out_chan.close end |
.start_task(out_chan, *args, &proc) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/go_chanel/task.rb', line 28 def self.start_task(out_chan, *args, &proc) concurrence_chan = GoChanel::Chanel.new Thread.new do begin proc.call(*args).each do |result| out_chan.push(result) end rescue => e GoChanel::ErrNotifier.notify(e) ensure concurrence_chan.push(true) end end concurrence_chan.pop out_chan.close end |