Class: Firehose::Rack::Consumer::HttpLongPoll

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/firehose/rack/consumer/http_long_poll.rb

Constant Summary collapse

TIMEOUT =

How long should we wait before closing out the consuming clients web connection for long polling? Most browsers timeout after a connection has been idle for 30s.

20

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers

#response

Constructor Details

#initialize(timeout = TIMEOUT) {|_self| ... } ⇒ HttpLongPoll

Returns a new instance of HttpLongPoll.

Yields:

  • (_self)

Yield Parameters:



16
17
18
19
# File 'lib/firehose/rack/consumer/http_long_poll.rb', line 16

def initialize(timeout=TIMEOUT)
  @timeout = timeout
  yield self if block_given?
end

Instance Attribute Details

#timeoutObject

Configures the timeout for the



14
15
16
# File 'lib/firehose/rack/consumer/http_long_poll.rb', line 14

def timeout
  @timeout
end

Instance Method Details

#call(env) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/firehose/rack/consumer/http_long_poll.rb', line 21

def call(env)
  req     = env['parsed_request'] ||= ::Rack::Request.new(env)
  path    = req.path
  method  = req.request_method
  # Get the Last Message Sequence from the query string.
  # Ideally we'd use an HTTP header, but android devices don't let us
  # set any HTTP headers for CORS requests.
  last_sequence = req.params['last_message_sequence'].to_i

  case method
  # GET is how clients subscribe to the queue. When a messages comes in, we flush out a response,
  # close down the requeust, and the client then reconnects.
  when 'GET'
    Firehose.logger.debug "HTTP GET with last_sequence #{last_sequence} for path #{path} with query #{env["QUERY_STRING"].inspect} and params #{req.params.inspect}"
    EM.next_tick do

      if last_sequence < 0
        env['async.callback'].call response(400, "The last_message_sequence parameter may not be less than zero", response_headers(env))
      else
        Server::Channel.new(path).next_message(last_sequence, :timeout => timeout).callback do |message, sequence|
          env['async.callback'].call response(200, wrap_frame(message, sequence), response_headers(env))
        end.errback do |e|
          if e == :timeout
            env['async.callback'].call response(204, '', response_headers(env))
          else
            Firehose.logger.error "Unexpected error when trying to GET last_sequence #{last_sequence} for path #{path}: #{e.inspect}"
            env['async.callback'].call response(500, 'Unexpected error', response_headers(env))
          end
        end
      end

    end

    # Tell the web server that this will be an async response.
    ASYNC_RESPONSE

  else
    Firehose.logger.debug "HTTP #{method} not supported"
    response(501, "#{method} not supported.")
  end
end