Class: Firehose::Rack::Consumer::WebSocket::DefaultHandler

Inherits:
Handler
  • Object
show all
Defined in:
lib/firehose/rack/consumer/web_socket.rb

Overview

Manages connection state for the web socket that’s connected by the Consumer::WebSocket class. Deals with message sequence, connection, failures, and subscription state.

Instance Method Summary collapse

Methods inherited from Handler

#error, #initialize, #parse_message, #send_message

Constructor Details

This class inherits a constructor from Firehose::Rack::Consumer::WebSocket::Handler

Instance Method Details

#close(event) ⇒ Object

Log a message that hte client has disconnected and reset the state for the class. Clean up the subscribers to the channels.



79
80
81
82
83
84
85
# File 'lib/firehose/rack/consumer/web_socket.rb', line 79

def close(event)
  if @deferrable
    @deferrable.fail :disconnect
    @channel.unsubscribe(@deferrable) if @channel
  end
  Firehose.logger.debug "WS connection `#{@req.path}` closing. Code: #{event.code.inspect}; Reason #{event.reason.inspect}"
end

#message(event) ⇒ Object

Manages messages sent from the connect client to the server. This is mostly used to handle heart-beats that are designed to prevent the WebSocket connection from timing out from inactivity.



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/firehose/rack/consumer/web_socket.rb', line 60

def message(event)
  msg = parse_message(event)
  seq = msg[:message_sequence]
  if msg[:ping] == 'PING'
    Firehose.logger.debug "WS ping received, sending pong"
    send_message pong: "PONG"
  elsif !@subscribed && seq.kind_of?(Integer)
    Firehose.logger.debug "Subscribing at message_sequence #{seq}"
    subscribe seq
  end
end

#open(event) ⇒ Object

Log a message that the client has connected.



73
74
75
# File 'lib/firehose/rack/consumer/web_socket.rb', line 73

def open(event)
  Firehose.logger.debug "WebSocket subscribed to `#{@req.path}`. Waiting for message_sequence..."
end

#subscribe(last_sequence) ⇒ Object

Subscribe the client to the channel on the server. Asks for the last sequence for clients that reconnect.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/firehose/rack/consumer/web_socket.rb', line 89

def subscribe(last_sequence)
  @subscribed = true
  @channel    = Server::Channel.new @req.path
  @deferrable = @channel.next_messages last_sequence
  @deferrable.callback do |messages|
    messages.each do |message|
      Firehose.logger.debug "WS sent `#{message.payload}` to `#{@req.path}` with sequence `#{message.sequence}`"
      send_message message: message.payload, last_sequence: message.sequence
    end
    subscribe messages.last.sequence
  end
  @deferrable.errback do |e|
    unless e == :disconnect
      Firehose.logger.error "WS Error: #{e}"
      EM.next_tick { raise e.inspect }
    end
  end
end