Class: Arf::Wire::Server::Peer

Inherits:
BaseConnection show all
Defined in:
lib/arf/wire/server/peer.rb

Instance Method Summary collapse

Methods inherited from BaseConnection

#arf_close_after_writing?, #cancel_streams, #close_connection, #close_connection_after_writing, #dispatch, #dispatch_frame, #fetch_stream, #flush_write_buffer, #go_away!, #handle_frame, #handle_ping, #ping, #protocol_error!, #recv, #registered!, #registered?, #send_data, #wait_configuration, #wait_pong

Constructor Details

#initialize(server) ⇒ Peer

Returns a new instance of Peer.



7
8
9
10
11
12
# File 'lib/arf/wire/server/peer.rb', line 7

def initialize(server)
  super()
  @server = server
  @id = @server.register_peer(self)
  @log = Arf.logger.with_fields(subsystem: "Peer", id: @id)
end

Instance Method Details

#closeObject



14
15
16
17
# File 'lib/arf/wire/server/peer.rb', line 14

def close
  super
  @server.unregister_peer(@id)
end

#handle_configuration(fr) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/arf/wire/server/peer.rb', line 19

def handle_configuration(fr)
  return protocol_error! if @configured

  resp = ConfigurationFrame.new.tap(&:ack!)

  if fr.compression_brotli?
    @compression = :brotli
    resp.compression_brotli!
  elsif fr.compression_gzip?
    @compression = :gzip
    resp.compression_gzip!
  end

  resp.max_concurrent_streams = 0 # TODO

  @configured = true
  @log.debug("Configuration OK")
  dispatch(resp)
end

#handle_data(fr) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/arf/wire/server/peer.rb', line 58

def handle_data(fr)
  return protocol_error! unless @configured

  str = fetch_stream(fr.stream_id)
  return reset_stream(fr.stream_id, ERROR_CODE_PROTOCOL_ERROR) unless str

  str.handle_data(fr)
end

#handle_go_away(fr) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/arf/wire/server/peer.rb', line 39

def handle_go_away(fr)
  return protocol_error! unless @configured

  # client intends to disconnect
  cancel_streams(fr.error_code)
  close_connection_after_writing
end

#handle_make_stream(fr) ⇒ Object



47
48
49
50
51
52
53
54
55
56
# File 'lib/arf/wire/server/peer.rb', line 47

def handle_make_stream(fr)
  return protocol_error! unless @configured
  return protocol_error! unless fetch_stream(fr.stream_id).nil?

  @streams_monitor.synchronize do
    @streams[fr.stream_id] = Stream.new(fr.stream_id, self)
  end
  @log.debug("Async dispatch handle_stream")
  Reactor.post { @server.handle_stream(@streams[fr.stream_id]) }
end

#handle_reset_stream(fr) ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/arf/wire/server/peer.rb', line 67

def handle_reset_stream(fr)
  return protocol_error! unless @configured

  str = fetch_stream(fr.stream_id)
  return protocol_error! unless str

  str.handle_reset_stream(fr)
end

#reset_stream(id, code) ⇒ Object



76
77
78
79
80
81
# File 'lib/arf/wire/server/peer.rb', line 76

def reset_stream(id, code)
  dispatch_frame(ResetStreamFrame) do |r|
    r.stream_id = id
    r.error_code = code
  end
end