Class: Firehose::Rack::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/rack/consumer.rb,
lib/firehose/rack/consumer/web_socket.rb,
lib/firehose/rack/consumer/http_long_poll.rb

Overview

Handles a subscription request over HTTP or WebSockets depeding on its abilities and binds that to the Firehose::Server::Subscription class, which is bound to a channel that gets published to.

Defined Under Namespace

Classes: HttpLongPoll, WebSocket

Constant Summary collapse

MULTIPLEX_CHANNEL =
"channels@firehose"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize {|_self| ... } ⇒ Consumer

Let the client configure the consumer on initialization.

Yields:

  • (_self)

Yield Parameters:



53
54
55
# File 'lib/firehose/rack/consumer.rb', line 53

def initialize
  yield self if block_given?
end

Class Method Details

.multiplex_subscriptions(request) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/firehose/rack/consumer.rb', line 19

def self.multiplex_subscriptions(request)
  if request.get?
    query_string_subscriptions(request.env)
  elsif request.post?
    post_subscriptions(request)
  end
end

.multiplexing_request?(env) ⇒ Boolean

Returns:

  • (Boolean)


15
16
17
# File 'lib/firehose/rack/consumer.rb', line 15

def self.multiplexing_request?(env)
  env["PATH_INFO"].include? MULTIPLEX_CHANNEL
end

.post_subscriptions(request) ⇒ Object



41
42
43
44
45
46
47
48
49
50
# File 'lib/firehose/rack/consumer.rb', line 41

def self.post_subscriptions(request)
  body = request.body.read
  subs = JSON.parse(body).map do |chan, last_sequence|
    last_sequence = 0 if last_sequence < 0
    {
      channel: chan,
      message_sequence: last_sequence
    }
  end
end

.query_string_subscriptions(env) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/firehose/rack/consumer.rb', line 27

def self.query_string_subscriptions(env)
  query_params = ::Rack::Utils.parse_query(env["QUERY_STRING"])

  query_params["subscribe"].to_s.split(",").map do |sub|
    chan, last_sequence = sub.split("!")
    last_sequence = last_sequence.to_i
    last_sequence = 0 if last_sequence < 0
    {
      channel: chan,
      message_sequence: last_sequence
    }
  end
end

Instance Method Details

#call(env) ⇒ Object



57
58
59
# File 'lib/firehose/rack/consumer.rb', line 57

def call(env)
  websocket_request?(env) ? websocket.call(env) : http_long_poll.call(env)
end

#http_long_pollObject

Memoized instance of http long poll handler that can be configured from the rack app.



67
68
69
# File 'lib/firehose/rack/consumer.rb', line 67

def http_long_poll
  @http_long_poll ||= HttpLongPoll.new
end

#websocketObject

Memoized instance of web socket that can be configured from the rack app.



62
63
64
# File 'lib/firehose/rack/consumer.rb', line 62

def websocket
  @web_socket ||= WebSocket.new
end