Class: BirdGrinder::Tweeter::StreamingRequest
- Inherits:
-
Object
- Object
- BirdGrinder::Tweeter::StreamingRequest
- Defined in:
- lib/bird_grinder/tweeter/streaming_request.rb
Overview
A request implementation for the twitter streaming api that correctly: 1) keeps connections alives 2) reacts accordingly to errors 3) handles tweets in an evented way
It’s built around em-http-request internally but also makes use of BirdGrinder::Tweeter and BirdGrinder::Tweeter::Streaming to provide a nice, user friendly interface.
Constant Summary collapse
- INITIAL_DELAYS =
Values / rates as suggested in the twitter api.
{:http => 10, :network => 0.25}
- MAX_DELAYS =
{:http => 240, :network => 16}
- DELAY_CALCULATOR =
{ :http => L { |v| v * 2 }, :network => L { |v| v + INITIAL_DELAYS[:network] } }
Instance Method Summary collapse
- #authorization_method ⇒ Object
-
#default_request_options ⇒ Object
Returns a set of options that apply to the request no matter what method is used to send the request.
-
#fail!(type) ⇒ Object
Process a failure and responds accordingly.
-
#full_url ⇒ Object
Returns the full streaming api associated with this url.
-
#http_options(type) ⇒ Object
Returns normalized http options for the current request, built on top of default_request_options and a few other details.
-
#initialize(parent, name, options = {}) ⇒ StreamingRequest
constructor
Creates a streaming request.
-
#perform ⇒ Object
Starts the streaming connection.
-
#receive_chunk(c) ⇒ Object
Processes a chunk of the incoming request, parsing it with the stream processor as well as resetting anything that is used to track failure (as a chunk implies that it’s successful).
-
#request_method ⇒ Object
Returns the correct http method to be used for the current path.
-
#stream_processor ⇒ Object
Returns the current stream processor, creating a new one if it hasn’t been initialized yet.
Constructor Details
#initialize(parent, name, options = {}) ⇒ StreamingRequest
Creates a streaming request.
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 31 def initialize(parent, name, = {}) logger.debug "Creating stream '#{name}' with options: #{.inspect}" @parent = parent @name = name @path = .delete(:path) || name @metadata = .delete(:metadata) || {} @options = @failure_delay = nil @failure_count = 0 @failure_reason = nil end |
Instance Method Details
#authorization_method ⇒ Object
140 141 142 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 140 def @authorization_method ||= BasicAuthorization.new end |
#default_request_options ⇒ Object
Returns a set of options that apply to the request no matter what method is used to send the request. It’s important that this is used for credentials as well as making sure there is no timeout on the connection
104 105 106 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 104 def {:timeout => 0, :head => {}} end |
#fail!(type) ⇒ Object
Process a failure and responds accordingly.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 60 def fail!(type) suffix = type == :http ? " (Error Code #{@current_request.response_header.status})" : "" logger.debug "Streaming failed with #{type}#{suffix}" if @failure_count == 0 || @failure_reason != type logger.debug "Instantly restarting (#{@failure_count == 0 ? "First failure" : "Different type of failure"})" EM.next_tick { perform } else @failure_delay ||= INITIAL_DELAYS[type] logger.debug "Restarting stream in #{@failure_delay} seconds" logger.debug "Adding timer to restart in #{@failure_delay} seconds" EM.add_timer(@failure_delay) { perform } potential_new_delay = DELAY_CALCULATOR[type].call(@failure_delay) @failure_delay = [potential_new_delay, MAX_DELAYS[type]].min logger.debug "Next delay is #{@failure_delay}" end @failure_count += 1 @failure_reason = type logger.debug "Failed #{@failure_count} times with #{@failure_reason}" end |
#full_url ⇒ Object
Returns the full streaming api associated with this url.
136 137 138 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 136 def full_url @full_url ||= (Streaming.streaming_base_url / Streaming.api_version.to_s / "statuses" / "#{@path}.json") end |
#http_options(type) ⇒ Object
Returns normalized http options for the current request, built on top of default_request_options and a few other details.
112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 112 def (type) base = self. if @options.present? if type == :get base[:query] = @options else base[:head]['Content-Type'] = "application/x-www-form-urlencoded" base[:body] = body = {} @options.each_pair { |k,v| body[CGI.escape(k.to_s)] = CGI.escape(v) } end end base end |
#perform ⇒ Object
Starts the streaming connection
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 44 def perform logger.debug "Preparing to start stream" @stream_processor = nil type = request_method http = EventMachine::HttpRequest.new(full_url).send(type, (type)) .add_header_to(http) # Handle failures correctly so we can back off @current_request = http http.errback { fail!(:network)} http.callback { http.response_header.status > 299 ? fail!(:http) : perform } http.stream { |c| receive_chunk(c) } end |
#receive_chunk(c) ⇒ Object
Processes a chunk of the incoming request, parsing it with the stream processor as well as resetting anything that is used to track failure (as a chunk implies that it’s successful)
90 91 92 93 94 95 96 97 98 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 90 def receive_chunk(c) return unless @current_request.response_header.status == 200 if !@failure_reason.nil? @failure_reason = nil @failure_delay = nil @failure_count = 0 end stream_processor.receive_chunk(c) end |
#request_method ⇒ Object
Returns the correct http method to be used for the current path.
127 128 129 130 131 132 133 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 127 def request_method {:filter => :post, :sample => :get, :firehose => :get, :retweet => :get }.fetch(@path, :get) end |
#stream_processor ⇒ Object
Returns the current stream processor, creating a new one if it hasn’t been initialized yet.
81 82 83 |
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 81 def stream_processor @stream_processor ||= StreamProcessor.new(@parent, @name, @metadata) end |