Class: Emit::Channel
- Inherits:
-
Object
- Object
- Emit::Channel
- Defined in:
- lib/emit/channel.rb
Instance Method Summary collapse
-
#initialize(name = nil) ⇒ Channel
constructor
A new instance of Channel.
- #leave_reader ⇒ Object
- #leave_writer ⇒ Object
- #poison ⇒ Object
- #poisoned? ⇒ Boolean
-
#post_read(request) ⇒ Object
private.
- #post_write(request) ⇒ Object
- #read ⇒ Object
- #reader ⇒ Object (also: #+@)
- #remove_read(request) ⇒ Object
- #remove_write(request) ⇒ Object
- #retired? ⇒ Boolean
- #write(message) ⇒ Object
- #writer ⇒ Object (also: #-@)
Constructor Details
#initialize(name = nil) ⇒ Channel
Returns a new instance of Channel.
5 6 7 8 9 10 11 12 |
# File 'lib/emit/channel.rb', line 5 def initialize(name=nil) @name = name || SecureRandom.uuid @read_queue = [] @readers = 0 @write_queue = [] @writers = 0 @state = :alive end |
Instance Method Details
#leave_reader ⇒ Object
83 84 85 86 87 88 89 90 |
# File 'lib/emit/channel.rb', line 83 def leave_reader return if retired? @readers -= 1 if @readers.zero? @state = :retired @write_queue.each(&:retire) end end |
#leave_writer ⇒ Object
92 93 94 95 96 97 98 99 |
# File 'lib/emit/channel.rb', line 92 def leave_writer return if retired? @writers -= 1 if @writers.zero? @state = :retired @read_queue.each(&:retire) end end |
#poison ⇒ Object
56 57 58 59 60 61 |
# File 'lib/emit/channel.rb', line 56 def poison return if poisoned? @state = :poisoned @read_queue.each(&:poison) @write_queue.each(&:poison) end |
#poisoned? ⇒ Boolean
75 76 77 |
# File 'lib/emit/channel.rb', line 75 def poisoned? @state == :poisoned end |
#post_read(request) ⇒ Object
private
103 104 105 106 107 |
# File 'lib/emit/channel.rb', line 103 def post_read(request) check_termination! @read_queue << request match end |
#post_write(request) ⇒ Object
109 110 111 112 113 |
# File 'lib/emit/channel.rb', line 109 def post_write(request) check_termination! @write_queue << request match end |
#read ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/emit/channel.rb', line 14 def read check_termination! process = Scheduler.current fast_read(process).tap do |msg| return msg unless msg.nil? end process.state = :active request = ChannelRequest.new(process) post_read(request) request.process.wait remove_read(request) return request. if request.success? check_termination! abort "Should not get here..." end |
#reader ⇒ Object Also known as: +@
63 64 65 66 |
# File 'lib/emit/channel.rb', line 63 def reader @readers += 1 ChannelEndRead.new(self) end |
#remove_read(request) ⇒ Object
115 116 117 |
# File 'lib/emit/channel.rb', line 115 def remove_read(request) @read_queue.delete(request) end |
#remove_write(request) ⇒ Object
119 120 121 |
# File 'lib/emit/channel.rb', line 119 def remove_write(request) @write_queue.delete(request) end |
#retired? ⇒ Boolean
79 80 81 |
# File 'lib/emit/channel.rb', line 79 def retired? @state == :retired end |
#write(message) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/emit/channel.rb', line 35 def write() check_termination! process = Scheduler.current fast_write(process, ).tap do |written| return true unless written.nil? end process.state = :active request = ChannelRequest.new(process, ) post_write(request) request.process.wait remove_write(request) return true if request.success? check_termination! abort "Should not get here..." end |
#writer ⇒ Object Also known as: -@
69 70 71 72 |
# File 'lib/emit/channel.rb', line 69 def writer @writers += 1 ChannelEndWrite.new(self) end |