Class: Firehose::Rack::Consumer::HttpLongPoll
- Inherits:
-
Object
- Object
- Firehose::Rack::Consumer::HttpLongPoll
- Includes:
- Helpers
- Defined in:
- lib/firehose/rack/consumer/http_long_poll.rb
Constant Summary collapse
- TIMEOUT =
How long should we wait before closing out the consuming clients web connection for long polling? Most browsers timeout after a connection has been idle for 30s.
20
Instance Attribute Summary collapse
-
#timeout ⇒ Object
Configures the timeout for the.
Instance Method Summary collapse
- #call(env) ⇒ Object
-
#initialize(timeout = TIMEOUT) {|_self| ... } ⇒ HttpLongPoll
constructor
A new instance of HttpLongPoll.
Methods included from Helpers
Constructor Details
#initialize(timeout = TIMEOUT) {|_self| ... } ⇒ HttpLongPoll
Returns a new instance of HttpLongPoll.
16 17 18 19 |
# File 'lib/firehose/rack/consumer/http_long_poll.rb', line 16 def initialize(timeout=TIMEOUT) @timeout = timeout yield self if block_given? end |
Instance Attribute Details
#timeout ⇒ Object
Configures the timeout for the
14 15 16 |
# File 'lib/firehose/rack/consumer/http_long_poll.rb', line 14 def timeout @timeout end |
Instance Method Details
#call(env) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/firehose/rack/consumer/http_long_poll.rb', line 21 def call(env) req = env['parsed_request'] ||= ::Rack::Request.new(env) path = req.path method = req.request_method # Get the Last Message Sequence from the query string. # Ideally we'd use an HTTP header, but android devices don't let us # set any HTTP headers for CORS requests. last_sequence = req.params['last_message_sequence'].to_i case method # GET is how clients subscribe to the queue. When a messages comes in, we flush out a response, # close down the requeust, and the client then reconnects. when 'GET' Firehose.logger.debug "HTTP GET with last_sequence #{last_sequence} for path #{path} with query #{env["QUERY_STRING"].inspect} and params #{req.params.inspect}" EM.next_tick do if last_sequence < 0 env['async.callback'].call response(400, "The last_message_sequence parameter may not be less than zero", response_headers(env)) else Server::Channel.new(path).(last_sequence, :timeout => timeout).callback do |, sequence| env['async.callback'].call response(200, wrap_frame(, sequence), response_headers(env)) end.errback do |e| if e == :timeout env['async.callback'].call response(204, '', response_headers(env)) else Firehose.logger.error "Unexpected error when trying to GET last_sequence #{last_sequence} for path #{path}: #{e.inspect}" env['async.callback'].call response(500, 'Unexpected error', response_headers(env)) end end end end # Tell the web server that this will be an async response. ASYNC_RESPONSE else Firehose.logger.debug "HTTP #{method} not supported" response(501, "#{method} not supported.") end end |