Class: Arf::Wire::BaseConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/arf/wire/base_connection.rb

Direct Known Subclasses

Client, Server::Peer

Instance Method Summary collapse

Constructor Details

#initializeBaseConnection

Returns a new instance of BaseConnection.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/arf/wire/base_connection.rb', line 6

def initialize
  @_write_lock = Monitor.new
  @_write_buffer = Queue.new
  @_write_head = nil
  @compression = :none
  @last_stream_id = 0
  @reader = FrameReader.new
  @configured = false
  @pong_signal = WaitSignal.new

  @streams = {}
  @streams_monitor = Monitor.new

  @configuration_ready_lock = Mutex.new
  @configuration_ready_signal = Thread::ConditionVariable.new
  @registered = false
end

Instance Method Details

#arf_close_after_writing?Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/arf/wire/base_connection.rb', line 98

def arf_close_after_writing?
  @arf_close_after_writing || false
end

#cancel_streams(code) ⇒ Object



75
76
77
78
79
80
81
82
83
# File 'lib/arf/wire/base_connection.rb', line 75

def cancel_streams(code)
  @streams_monitor.synchronize do
    @streams.each_value do |v|
      v.reset(code)
    rescue ClosedStreamError, StreamResetError
      next
    end
  end
end

#closeObject



104
105
106
# File 'lib/arf/wire/base_connection.rb', line 104

def close
  close_connection
end

#close_connectionObject



102
# File 'lib/arf/wire/base_connection.rb', line 102

def close_connection = Reactor.detach_client(self)

#close_connection_after_writingObject



108
109
110
# File 'lib/arf/wire/base_connection.rb', line 108

def close_connection_after_writing
  @arf_close_after_writing = true
end

#dispatch(frame) ⇒ Object



139
140
141
# File 'lib/arf/wire/base_connection.rb', line 139

def dispatch(frame)
  send_data(frame.to_frame.bytes(@compression))
end

#dispatch_frame(type, terminate: false) ⇒ Object



134
135
136
137
# File 'lib/arf/wire/base_connection.rb', line 134

def dispatch_frame(type, terminate: false, &)
  dispatch(type.new(&))
  close_connection_after_writing if terminate
end

#fetch_stream(id) ⇒ Object



96
# File 'lib/arf/wire/base_connection.rb', line 96

def fetch_stream(id) = @streams_monitor.synchronize { @streams[id] }

#flush_write_buffer(io) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/arf/wire/base_connection.rb', line 153

def flush_write_buffer(io)
  @_write_lock.synchronize do
    loop do
      if @_write_head.nil?
        return true if @_write_buffer.empty?

        @_write_head = @_write_buffer.pop
      end

      written = io.write_nonblock(@_write_head, exception: false)
      case written
      when :wait_writable
        return false
      when @_write_head.bytesize
        @_write_head = nil
      else
        @_write_head = @_write_head.byteslice(written, @_write_head.bytesize)
        return false
      end
    end
  end
end

#go_away!(code, extra_data: nil, terminate: false) ⇒ Object



145
146
147
148
149
150
151
# File 'lib/arf/wire/base_connection.rb', line 145

def go_away!(code, extra_data: nil, terminate: false)
  dispatch_frame(GoAwayFrame, terminate:) do |g|
    g.last_stream_id = @last_stream_id
    g.error_code = code
    g.additional_data = extra_data if extra_data
  end
end

#handle_frame(raw_frame) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/arf/wire/base_connection.rb', line 53

def handle_frame(raw_frame)
  frame = raw_frame.specialize(@compression)
  case frame
  when ConfigurationFrame
    handle_configuration(frame)
  when PingFrame
    handle_ping(frame)
  when GoAwayFrame
    handle_go_away(frame)
  when MakeStreamFrame
    handle_make_stream(frame)
  when ResetStreamFrame
    handle_reset_stream(frame)
  when DataFrame
    handle_data(frame)
  else
    protocol_error!
  end
rescue UnknownFrameKindError, InvalidFrameLengthError
  protocol_error!
end

#handle_ping(fr) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/arf/wire/base_connection.rb', line 120

def handle_ping(fr)
  return protocol_error! unless @configured

  if fr.ack?
    @pong_signal.broadcast
    return
  end

  dispatch_frame(PingFrame) do |p|
    p.ack!
    p.payload = fr.payload
  end
end

#pingObject



85
86
87
88
89
90
91
92
# File 'lib/arf/wire/base_connection.rb', line 85

def ping
  wait_configuration
  @streams_monitor.synchronize do
    dispatch_frame(PingFrame) do |p|
      p.payload = SecureRandom.bytes(8)
    end
  end
end

#protocol_error!Object



143
# File 'lib/arf/wire/base_connection.rb', line 143

def protocol_error! = go_away!(ERROR_CODE_PROTOCOL_ERROR, terminate: true)

#recv(data) ⇒ Object



44
45
46
47
48
49
50
51
# File 'lib/arf/wire/base_connection.rb', line 44

def recv(data)
  data.each_byte do |b|
    v = @reader.feed(b)
    next unless v

    handle_frame(v)
  end
end

#registered!Object



26
27
28
29
30
31
# File 'lib/arf/wire/base_connection.rb', line 26

def registered!
  @registered = true
  @_write_lock.synchronize do
    Arf::Reactor.client_writes_pending(self) unless @_write_buffer.empty?
  end
end

#registered?Boolean

Returns:

  • (Boolean)


24
# File 'lib/arf/wire/base_connection.rb', line 24

def registered? = @registered

#send_data(data) ⇒ Object



112
113
114
115
116
117
118
# File 'lib/arf/wire/base_connection.rb', line 112

def send_data(data)
  @_write_lock.synchronize do
    @_write_buffer << data
  end
  Reactor.client_writes_pending(self) if registered?
  data.bytesize
end

#wait_configurationObject



33
34
35
36
37
38
39
40
41
42
# File 'lib/arf/wire/base_connection.rb', line 33

def wait_configuration
  return if @configured

  loop do
    @configuration_ready_lock.synchronize do
      @configuration_ready_signal.wait(@configuration_ready_lock)
      return if @configured
    end
  end
end

#wait_pongObject



94
# File 'lib/arf/wire/base_connection.rb', line 94

def wait_pong = @pong_signal.wait