Class: Emit::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/emit/channel.rb

Instance Method Summary collapse

Constructor Details

#initialize(name = nil) ⇒ Channel

Returns a new instance of Channel.



5
6
7
8
9
10
11
12
# File 'lib/emit/channel.rb', line 5

def initialize(name=nil)
  @name        = name || SecureRandom.uuid
  @read_queue  = []
  @readers     = 0
  @write_queue = []
  @writers     = 0
  @state       = :alive
end

Instance Method Details

#leave_readerObject



83
84
85
86
87
88
89
90
# File 'lib/emit/channel.rb', line 83

def leave_reader
  return if retired?
  @readers -= 1
  if @readers.zero?
    @state = :retired
    @write_queue.each(&:retire)
  end
end

#leave_writerObject



92
93
94
95
96
97
98
99
# File 'lib/emit/channel.rb', line 92

def leave_writer
  return if retired?
  @writers -= 1
  if @writers.zero?
    @state = :retired
    @read_queue.each(&:retire)
  end
end

#poisonObject



56
57
58
59
60
61
# File 'lib/emit/channel.rb', line 56

def poison
  return if poisoned?
  @state = :poisoned
  @read_queue.each(&:poison)
  @write_queue.each(&:poison)
end

#poisoned?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/emit/channel.rb', line 75

def poisoned?
  @state == :poisoned
end

#post_read(request) ⇒ Object

private



103
104
105
106
107
# File 'lib/emit/channel.rb', line 103

def post_read(request)
  check_termination!
  @read_queue << request
  match
end

#post_write(request) ⇒ Object



109
110
111
112
113
# File 'lib/emit/channel.rb', line 109

def post_write(request)
  check_termination!
  @write_queue << request
  match
end

#readObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/emit/channel.rb', line 14

def read
  check_termination!

  process = Scheduler.current

  fast_read(process).tap do |msg|
    return msg unless msg.nil?
  end

  process.state = :active
  request = ChannelRequest.new(process)
  post_read(request)
  request.process.wait
  remove_read(request)

  return request.message if request.success?

  check_termination!
  abort "Should not get here..."
end

#readerObject Also known as: +@



63
64
65
66
# File 'lib/emit/channel.rb', line 63

def reader
  @readers += 1
  ChannelEndRead.new(self)
end

#remove_read(request) ⇒ Object



115
116
117
# File 'lib/emit/channel.rb', line 115

def remove_read(request)
  @read_queue.delete(request)
end

#remove_write(request) ⇒ Object



119
120
121
# File 'lib/emit/channel.rb', line 119

def remove_write(request)
  @write_queue.delete(request)
end

#retired?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/emit/channel.rb', line 79

def retired?
  @state == :retired
end

#write(message) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/emit/channel.rb', line 35

def write(message)
  check_termination!

  process = Scheduler.current

  fast_write(process, message).tap do |written|
    return true unless written.nil?
  end

  process.state = :active
  request = ChannelRequest.new(process, message)
  post_write(request)
  request.process.wait
  remove_write(request)

  return true if request.success?

  check_termination!
  abort "Should not get here..."
end

#writerObject Also known as: -@



69
70
71
72
# File 'lib/emit/channel.rb', line 69

def writer
  @writers += 1
  ChannelEndWrite.new(self)
end