Class: Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/channel.rb

Direct Known Subclasses

MainChannel

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*params, &block) ⇒ Channel

Returns a new instance of Channel.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/channel.rb', line 4

def initialize(*params, &block)
  @writing_thread = nil
  @reading_thread = nil
  @channel_out = params.last || self
  @mutex = Mutex.new
  @sender = nil
  @value = []
  @subchannels = {}
  @write_mutex = Mutex.new
  if block_given? 
    @thread = new_thread { yield self }
  else 
    raise "Must implement run" unless respond_to?(:run)
    @thread = new_thread { run }
  end
end

Instance Attribute Details

#channel_outObject (readonly)

Returns the value of attribute channel_out.



2
3
4
# File 'lib/channel.rb', line 2

def channel_out
  @channel_out
end

#senderObject (readonly)

Returns the value of attribute sender.



2
3
4
# File 'lib/channel.rb', line 2

def sender
  @sender
end

#threadObject (readonly)

Returns the value of attribute thread.



2
3
4
# File 'lib/channel.rb', line 2

def thread
  @thread
end

Instance Method Details

#add_subchannel(channel, &block) ⇒ Object



72
73
74
# File 'lib/channel.rb', line 72

def add_subchannel(channel, &block)
  @subchannels[channel] = block
end

#killObject



68
69
70
# File 'lib/channel.rb', line 68

def kill
  @thread.kill
end

#new_thread(&block) ⇒ Object



21
22
23
24
25
26
27
28
29
# File 'lib/channel.rb', line 21

def new_thread(&block)
  @thread = Thread.new { 
    begin
      yield block
    rescue => e
      puts "error running thread #{e}"
    end
  }
end

#processObject



81
82
83
84
85
86
87
88
89
# File 'lib/channel.rb', line 81

def process
  result = self.read
  function = @subchannels[self.sender]
  begin
    instance_exec result, self.sender, &function
  rescue => e
    puts "got an error #{e.inspect}"
  end
end

#readObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/channel.rb', line 52

def read
  @mutex.synchronize {
    if @writing_thread 
      @writing_thread.wakeup
      @writing_thread = nil
    else 
      @reading_thread = Thread.current
      @mutex.sleep
      @reading_thread = nil
    end
    response = @value.shift
    @sender = response[:sender]
    response[:value]
  }
end

#remove_subchannel(channel) ⇒ Object



76
77
78
79
# File 'lib/channel.rb', line 76

def remove_subchannel(channel)
  channel.kill
  @subchannels[channel] = nil
end

#write(value, sender = nil) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/channel.rb', line 31

def write(value, sender = nil)
  @write_mutex.synchronize {
    @mutex.synchronize {
      @value.push(:value => value, :sender => sender)
      if @reading_thread.nil?
        @writing_thread = Thread.current
        @mutex.sleep
        @write_mutex.unlock
        @writing_thread = nil
      else
        @reading_thread.wakeup
        @reading_thread = nil
      end
    }
  }
end

#write_out(value) ⇒ Object



48
49
50
# File 'lib/channel.rb', line 48

def write_out(value)
  @channel_out.write(value, self)
end