Class: Firehose::Rack::Consumer::WebSocket::Handler
- Inherits:
-
Object
- Object
- Firehose::Rack::Consumer::WebSocket::Handler
- Defined in:
- lib/firehose/rack/consumer/web_socket.rb
Overview
Manages connection state for the web socket that’s connected by the Consumer::WebSocket class. Deals with message sequence, connection, failures, and subscription state.
Class Method Summary collapse
-
.wrap_frame(message, last_sequence) ⇒ Object
Wrap a message in a sequence so that the client can record this and give us the sequence when it reconnects.
Instance Method Summary collapse
-
#close(event) ⇒ Object
Log a message that hte client has disconnected and reset the state for the class.
-
#error(event) ⇒ Object
Log errors if a socket fails.
-
#initialize(ws) ⇒ Handler
constructor
A new instance of Handler.
-
#message(event) ⇒ Object
Manages messages sent from the connect client to the server.
-
#open(event) ⇒ Object
Log a message that the client has connected.
-
#subscribe(last_sequence) ⇒ Object
Subscribe the client to the channel on the server.
Constructor Details
#initialize(ws) ⇒ Handler
Returns a new instance of Handler.
24 25 26 27 28 29 30 31 32 |
# File 'lib/firehose/rack/consumer/web_socket.rb', line 24 def initialize(ws) @ws = ws @req = ::Rack::Request.new ws.env # Setup the event handlers from this class. @ws.onopen = method :open @ws.onclose = method :close @ws.onerror = method :error @ws. = method :message end |
Class Method Details
.wrap_frame(message, last_sequence) ⇒ Object
Wrap a message in a sequence so that the client can record this and give us the sequence when it reconnects.
88 89 90 |
# File 'lib/firehose/rack/consumer/web_socket.rb', line 88 def self.wrap_frame(, last_sequence) JSON.generate :message => , :last_sequence => last_sequence end |
Instance Method Details
#close(event) ⇒ Object
Log a message that hte client has disconnected and reset the state for the class. Clean up the subscribers to the channels.
72 73 74 75 76 77 78 |
# File 'lib/firehose/rack/consumer/web_socket.rb', line 72 def close(event) if @deferrable @deferrable.fail :disconnect @channel.unsubscribe(@deferrable) if @channel end Firehose.logger.debug "WS connection `#{@req.path}` closing. Code: #{event.code.inspect}; Reason #{event.reason.inspect}" end |
#error(event) ⇒ Object
Log errors if a socket fails. ‘close` will fire after this to clean up any remaining connectons.
82 83 84 |
# File 'lib/firehose/rack/consumer/web_socket.rb', line 82 def error(event) Firehose.logger.error "WS connection `#{@req.path}` error. Message: `#{event..inspect}`; Data: `#{event.data.inspect}`" end |
#message(event) ⇒ Object
Manages messages sent from the connect client to the server. This is mostly used to handle heart-beats that are designed to prevent the WebSocket connection from timing out from inactivity.
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/firehose/rack/consumer/web_socket.rb', line 53 def (event) msg = JSON.parse(event.data, :symbolize_names => true) rescue {} seq = msg[:message_sequence] if msg[:ping] == 'PING' Firehose.logger.debug "WS ping received, sending pong" @ws.send JSON.generate :pong => 'PONG' elsif !@subscribed && seq.kind_of?(Integer) Firehose.logger.debug "Subscribing at message_sequence #{seq}" subscribe seq end end |
#open(event) ⇒ Object
Log a message that the client has connected.
66 67 68 |
# File 'lib/firehose/rack/consumer/web_socket.rb', line 66 def open(event) Firehose.logger.debug "WebSocket subscribed to `#{@req.path}`. Waiting for message_sequence..." end |
#subscribe(last_sequence) ⇒ Object
Subscribe the client to the channel on the server. Asks for the last sequence for clients that reconnect.
36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/firehose/rack/consumer/web_socket.rb', line 36 def subscribe(last_sequence) @subscribed = true @channel = Server::Channel.new @req.path @deferrable = @channel. last_sequence @deferrable.callback do |, sequence| Firehose.logger.debug "WS sent `#{}` to `#{@req.path}` with sequence `#{sequence}`" @ws.send self.class.wrap_frame(, last_sequence) subscribe sequence end @deferrable.errback do |e| EM.next_tick { raise e.inspect } unless e == :disconnect end end |