Class: Channel
- Inherits:
-
Object
- Object
- Channel
- Defined in:
- lib/channel.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#channel_out ⇒ Object
readonly
Returns the value of attribute channel_out.
-
#sender ⇒ Object
readonly
Returns the value of attribute sender.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Instance Method Summary collapse
- #add_subchannel(channel, &block) ⇒ Object
-
#initialize(*params, &block) ⇒ Channel
constructor
A new instance of Channel.
- #kill ⇒ Object
- #new_thread(&block) ⇒ Object
- #process ⇒ Object
- #read ⇒ Object
- #remove_subchannel(channel) ⇒ Object
- #write(value, sender = nil) ⇒ Object
- #write_out(value) ⇒ Object
Constructor Details
#initialize(*params, &block) ⇒ Channel
Returns a new instance of Channel.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/channel.rb', line 4 def initialize(*params, &block) @writing_thread = nil @reading_thread = nil @channel_out = params.last || self @mutex = Mutex.new @sender = nil @value = [] @subchannels = {} @write_mutex = Mutex.new if block_given? @thread = new_thread { yield self } else raise "Must implement run" unless respond_to?(:run) @thread = new_thread { run } end end |
Instance Attribute Details
#channel_out ⇒ Object (readonly)
Returns the value of attribute channel_out.
2 3 4 |
# File 'lib/channel.rb', line 2 def channel_out @channel_out end |
#sender ⇒ Object (readonly)
Returns the value of attribute sender.
2 3 4 |
# File 'lib/channel.rb', line 2 def sender @sender end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
2 3 4 |
# File 'lib/channel.rb', line 2 def thread @thread end |
Instance Method Details
#add_subchannel(channel, &block) ⇒ Object
72 73 74 |
# File 'lib/channel.rb', line 72 def add_subchannel(channel, &block) @subchannels[channel] = block end |
#kill ⇒ Object
68 69 70 |
# File 'lib/channel.rb', line 68 def kill @thread.kill end |
#new_thread(&block) ⇒ Object
21 22 23 24 25 26 27 28 29 |
# File 'lib/channel.rb', line 21 def new_thread(&block) @thread = Thread.new { begin yield block rescue => e puts "error running thread #{e}" end } end |
#process ⇒ Object
81 82 83 84 85 86 87 88 89 |
# File 'lib/channel.rb', line 81 def process result = self.read function = @subchannels[self.sender] begin instance_exec result, self.sender, &function rescue => e puts "got an error #{e.inspect}" end end |
#read ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/channel.rb', line 52 def read @mutex.synchronize { if @writing_thread @writing_thread.wakeup @writing_thread = nil else @reading_thread = Thread.current @mutex.sleep @reading_thread = nil end response = @value.shift @sender = response[:sender] response[:value] } end |
#remove_subchannel(channel) ⇒ Object
76 77 78 79 |
# File 'lib/channel.rb', line 76 def remove_subchannel(channel) channel.kill @subchannels[channel] = nil end |
#write(value, sender = nil) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/channel.rb', line 31 def write(value, sender = nil) @write_mutex.synchronize { @mutex.synchronize { @value.push(:value => value, :sender => sender) if @reading_thread.nil? @writing_thread = Thread.current @mutex.sleep @write_mutex.unlock @writing_thread = nil else @reading_thread.wakeup @reading_thread = nil end } } end |
#write_out(value) ⇒ Object
48 49 50 |
# File 'lib/channel.rb', line 48 def write_out(value) @channel_out.write(value, self) end |