Class: Firehose::Rack::Consumer::WebSocket::Handler

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/rack/consumer/web_socket.rb

Overview

Manages connection state for the web socket that’s connected by the Consumer::WebSocket class. Deals with message sequence, connection, failures, and subscription state.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ws) ⇒ Handler

Returns a new instance of Handler.



24
25
26
27
28
29
30
31
32
# File 'lib/firehose/rack/consumer/web_socket.rb', line 24

def initialize(ws)
  @ws = ws
  @req = ::Rack::Request.new ws.env
  # Setup the event handlers from this class.
  @ws.onopen    = method :open
  @ws.onclose   = method :close
  @ws.onerror   = method :error
  @ws.onmessage = method :message
end

Class Method Details

.wrap_frame(message, last_sequence) ⇒ Object

Wrap a message in a sequence so that the client can record this and give us the sequence when it reconnects.



88
89
90
# File 'lib/firehose/rack/consumer/web_socket.rb', line 88

def self.wrap_frame(message, last_sequence)
  JSON.generate :message => message, :last_sequence => last_sequence
end

Instance Method Details

#close(event) ⇒ Object

Log a message that hte client has disconnected and reset the state for the class. Clean up the subscribers to the channels.



72
73
74
75
76
77
78
# File 'lib/firehose/rack/consumer/web_socket.rb', line 72

def close(event)
  if @deferrable
    @deferrable.fail :disconnect
    @channel.unsubscribe(@deferrable) if @channel
  end
  Firehose.logger.debug "WS connection `#{@req.path}` closing. Code: #{event.code.inspect}; Reason #{event.reason.inspect}"
end

#error(event) ⇒ Object

Log errors if a socket fails. ‘close` will fire after this to clean up any remaining connectons.



82
83
84
# File 'lib/firehose/rack/consumer/web_socket.rb', line 82

def error(event)
  Firehose.logger.error "WS connection `#{@req.path}` error. Message: `#{event.message.inspect}`; Data: `#{event.data.inspect}`"
end

#message(event) ⇒ Object

Manages messages sent from the connect client to the server. This is mostly used to handle heart-beats that are designed to prevent the WebSocket connection from timing out from inactivity.



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/firehose/rack/consumer/web_socket.rb', line 53

def message(event)
  msg = JSON.parse(event.data, :symbolize_names => true) rescue {}
  seq = msg[:message_sequence]
  if msg[:ping] == 'PING'
    Firehose.logger.debug "WS ping received, sending pong"
    @ws.send JSON.generate :pong => 'PONG'
  elsif !@subscribed && seq.kind_of?(Integer)
    Firehose.logger.debug "Subscribing at message_sequence #{seq}"
    subscribe seq
  end
end

#open(event) ⇒ Object

Log a message that the client has connected.



66
67
68
# File 'lib/firehose/rack/consumer/web_socket.rb', line 66

def open(event)
  Firehose.logger.debug "WebSocket subscribed to `#{@req.path}`. Waiting for message_sequence..."
end

#subscribe(last_sequence) ⇒ Object

Subscribe the client to the channel on the server. Asks for the last sequence for clients that reconnect.



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/firehose/rack/consumer/web_socket.rb', line 36

def subscribe(last_sequence)
  @subscribed = true
  @channel    = Server::Channel.new @req.path
  @deferrable = @channel.next_message last_sequence
  @deferrable.callback do |message, sequence|
    Firehose.logger.debug "WS sent `#{message}` to `#{@req.path}` with sequence `#{sequence}`"
    @ws.send self.class.wrap_frame(message, last_sequence)
    subscribe sequence
  end
  @deferrable.errback do |e|
    EM.next_tick { raise e.inspect } unless e == :disconnect
  end
end