Class: Arf::Wire::Server::Peer
Instance Method Summary
collapse
#arf_close_after_writing?, #cancel_streams, #close_connection, #close_connection_after_writing, #dispatch, #dispatch_frame, #fetch_stream, #flush_write_buffer, #go_away!, #handle_frame, #handle_ping, #ping, #protocol_error!, #recv, #registered!, #registered?, #send_data, #wait_configuration, #wait_pong
Constructor Details
#initialize(server) ⇒ Peer
Returns a new instance of Peer.
7
8
9
10
11
12
|
# File 'lib/arf/wire/server/peer.rb', line 7
def initialize(server)
super()
@server = server
@id = @server.register_peer(self)
@log = Arf.logger.with_fields(subsystem: "Peer", id: @id)
end
|
Instance Method Details
#close ⇒ Object
14
15
16
17
|
# File 'lib/arf/wire/server/peer.rb', line 14
def close
super
@server.unregister_peer(@id)
end
|
#handle_configuration(fr) ⇒ Object
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/arf/wire/server/peer.rb', line 19
def handle_configuration(fr)
return protocol_error! if @configured
resp = ConfigurationFrame.new.tap(&:ack!)
if fr.compression_brotli?
@compression = :brotli
resp.compression_brotli!
elsif fr.compression_gzip?
@compression = :gzip
resp.compression_gzip!
end
resp.max_concurrent_streams = 0
@configured = true
@log.debug("Configuration OK")
dispatch(resp)
end
|
#handle_data(fr) ⇒ Object
58
59
60
61
62
63
64
65
|
# File 'lib/arf/wire/server/peer.rb', line 58
def handle_data(fr)
return protocol_error! unless @configured
str = fetch_stream(fr.stream_id)
return reset_stream(fr.stream_id, ERROR_CODE_PROTOCOL_ERROR) unless str
str.handle_data(fr)
end
|
#handle_go_away(fr) ⇒ Object
39
40
41
42
43
44
45
|
# File 'lib/arf/wire/server/peer.rb', line 39
def handle_go_away(fr)
return protocol_error! unless @configured
cancel_streams(fr.error_code)
close_connection_after_writing
end
|
#handle_make_stream(fr) ⇒ Object
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/arf/wire/server/peer.rb', line 47
def handle_make_stream(fr)
return protocol_error! unless @configured
return protocol_error! unless fetch_stream(fr.stream_id).nil?
@streams_monitor.synchronize do
@streams[fr.stream_id] = Stream.new(fr.stream_id, self)
end
@log.debug("Async dispatch handle_stream")
Reactor.post { @server.handle_stream(@streams[fr.stream_id]) }
end
|
#handle_reset_stream(fr) ⇒ Object
67
68
69
70
71
72
73
74
|
# File 'lib/arf/wire/server/peer.rb', line 67
def handle_reset_stream(fr)
return protocol_error! unless @configured
str = fetch_stream(fr.stream_id)
return protocol_error! unless str
str.handle_reset_stream(fr)
end
|
#reset_stream(id, code) ⇒ Object
76
77
78
79
80
81
|
# File 'lib/arf/wire/server/peer.rb', line 76
def reset_stream(id, code)
dispatch_frame(ResetStreamFrame) do |r|
r.stream_id = id
r.error_code = code
end
end
|