Class: Go::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/go/channel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Channel

Returns a new instance of Channel.



6
7
8
9
10
# File 'lib/go/channel.rb', line 6

def initialize(options={})
  @options = options
  @msg_counter = 0
  @queue = Queue.new
end

Instance Attribute Details

#closedObject (readonly)

Returns the value of attribute closed.



3
4
5
# File 'lib/go/channel.rb', line 3

def closed
  @closed
end

#msg_counterObject (readonly)

Returns the value of attribute msg_counter.



3
4
5
# File 'lib/go/channel.rb', line 3

def msg_counter
  @msg_counter
end

#optionsObject

Returns the value of attribute options.



4
5
6
# File 'lib/go/channel.rb', line 4

def options
  @options
end

Instance Method Details

#<<(ob) ⇒ Object



12
13
14
# File 'lib/go/channel.rb', line 12

def <<(ob)
  @queue << ob
end

#closeObject

close the channel



43
44
45
46
# File 'lib/go/channel.rb', line 43

def close
  @closed = true
  self << Go::Exit
end

#each(&blk) ⇒ Object



33
34
35
36
37
38
39
40
# File 'lib/go/channel.rb', line 33

def each(&blk)
  while true do
    break if @closed
    x = shift
    break if x == Go::Exit
    yield x
  end
end

#shiftObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/go/channel.rb', line 16

def shift
  begin
    @msg_counter += 1
    if @options[:close_after] && @msg_counter >= @options[:close_after]
      close()
    end
    @queue.shift
  rescue Exception => ex
    Go.logger.debug "#{ex.class.name}: #{ex.message}"
    if (ex.message.include?("deadlock") || ex.message.include?("Deadlock")) # ruby 2.0 uses Deadlock, capitalized
      close()
      return Go::Exit
    end
    raise ex
  end
end