Class: GoChanel::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/go_chanel/task.rb

Class Method Summary collapse

Class Method Details

.end_task(in_chan, concurrence_count, &proc) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/go_chanel/task.rb', line 45

def self.end_task(in_chan, concurrence_count, &proc)
  concurrence_chan = GoChanel::Chanel.new(concurrence_count)
  while true do
    args, ok = in_chan.pop
    break unless ok
    concurrence_chan.push(true)
    Thread.new(args) do |params|
      begin
        proc.call(*params)
      rescue => e
        GoChanel::ErrNotifier.notify(e)
      ensure
        concurrence_chan.pop
      end
    end
  end

  while concurrence_chan.size > 0 do
    sleep(0.1)
  end
end

.normal_task(in_chan, out_chan, concurrence_count = 1, &proc) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/go_chanel/task.rb', line 4

def self.normal_task(in_chan, out_chan, concurrence_count = 1, &proc)
  concurrence_chan = GoChanel::Chanel.new(concurrence_count)
  while true do
    args, ok = in_chan.pop
    break unless ok
    concurrence_chan.push(true)
    Thread.new(args) do |params|
      begin
        result = proc.call(*params)
        out_chan.push(result)
      rescue => e
        GoChanel::ErrNotifier.notify(e)
      ensure
        concurrence_chan.pop
      end
    end
  end

  while concurrence_chan.size > 0 do
    sleep(0.1)
  end
  out_chan.close
end

.start_task(out_chan, *args, &proc) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/go_chanel/task.rb', line 28

def self.start_task(out_chan, *args, &proc)
  concurrence_chan = GoChanel::Chanel.new
  Thread.new do
    begin
      proc.call(*args).each do |result|
        out_chan.push(result)
      end
    rescue => e
      GoChanel::ErrNotifier.notify(e)
    ensure
      concurrence_chan.push(true)
    end
  end
  concurrence_chan.pop
  out_chan.close
end