Class: BirdGrinder::Tweeter::StreamingRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/bird_grinder/tweeter/streaming_request.rb

Overview

A request implementation for the twitter streaming api that correctly: 1) keeps connections alives 2) reacts accordingly to errors 3) handles tweets in an evented way

It’s built around em-http-request internally but also makes use of BirdGrinder::Tweeter and BirdGrinder::Tweeter::Streaming to provide a nice, user friendly interface.

Constant Summary collapse

INITIAL_DELAYS =

Values / rates as suggested in the twitter api.

{:http => 10,  :network => 0.25}
MAX_DELAYS =
{:http => 240, :network => 16}
DELAY_CALCULATOR =
{
  :http    => L { |v| v * 2 },
  :network => L { |v| v + INITIAL_DELAYS[:network] }
}

Instance Method Summary collapse

Constructor Details

#initialize(parent, name, options = {}) ⇒ StreamingRequest

Creates a streaming request.

Parameters:

  • parent (BirdGrinder::Tweeter)

    the tweeter parent class

  • name (Sybol, String)

    the name of the stream type

  • options (Hash) (defaults to: {})

    The options for this request

Options Hash (options):

  • :path (Symbol, String)

    the path component used for the streaming api e.g. sample for filter.



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 31

def initialize(parent, name, options = {})
  logger.debug "Creating stream '#{name}' with options: #{options.inspect}"
  @parent         = parent
  @name           = name
  @path           = options.delete(:path) || name
  @metadata       = options.delete(:metadata) || {}
  @options        = options
  @failure_delay  = nil
  @failure_count  = 0
  @failure_reason = nil
end

Instance Method Details

#authorization_methodObject



140
141
142
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 140

def authorization_method
  @authorization_method ||= BasicAuthorization.new
end

#default_request_optionsObject

Returns a set of options that apply to the request no matter what method is used to send the request. It’s important that this is used for credentials as well as making sure there is no timeout on the connection



104
105
106
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 104

def default_request_options
  {:timeout => 0, :head => {}}
end

#fail!(type) ⇒ Object

Process a failure and responds accordingly.

Parameters:

  • type (Symbol)

    the type of error, one of :http or :network



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 60

def fail!(type)
  suffix = type == :http ? " (Error Code #{@current_request.response_header.status})" : ""
  logger.debug "Streaming failed with #{type}#{suffix}"
  if @failure_count == 0 || @failure_reason != type
    logger.debug "Instantly restarting (#{@failure_count == 0  ? "First failure" : "Different type of failure"})"
    EM.next_tick { perform }
  else
    @failure_delay ||= INITIAL_DELAYS[type]
    logger.debug "Restarting stream in #{@failure_delay} seconds"
    logger.debug "Adding timer to restart in #{@failure_delay} seconds"
    EM.add_timer(@failure_delay) { perform }
    potential_new_delay = DELAY_CALCULATOR[type].call(@failure_delay)
    @failure_delay = [potential_new_delay, MAX_DELAYS[type]].min
    logger.debug "Next delay is #{@failure_delay}"
  end
  @failure_count += 1
  @failure_reason = type
  logger.debug "Failed #{@failure_count} times with #{@failure_reason}"
end

#full_urlObject

Returns the full streaming api associated with this url.



136
137
138
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 136

def full_url
  @full_url ||= (Streaming.streaming_base_url / Streaming.api_version.to_s / "statuses" / "#{@path}.json")
end

#http_options(type) ⇒ Object

Returns normalized http options for the current request, built on top of default_request_options and a few other details.

Parameters:

  • type (Symbol)

    the type of request - :post or :get



112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 112

def http_options(type)
  base = self.default_request_options
  if @options.present?
    if type == :get
      base[:query] = @options
    else
      base[:head]['Content-Type'] = "application/x-www-form-urlencoded"
      base[:body] = body = {}
      @options.each_pair { |k,v| body[CGI.escape(k.to_s)] = CGI.escape(v) }
    end
  end
  base
end

#performObject

Starts the streaming connection



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 44

def perform
  logger.debug "Preparing to start stream"
  @stream_processor = nil
  type = request_method
  http = EventMachine::HttpRequest.new(full_url).send(type, http_options(type))
  authorization_method.add_header_to(http)
  # Handle failures correctly so we can back off
  @current_request = http
  http.errback  { fail!(:network)}
  http.callback { http.response_header.status > 299 ? fail!(:http) : perform }
  http.stream { |c| receive_chunk(c) }
end

#receive_chunk(c) ⇒ Object

Processes a chunk of the incoming request, parsing it with the stream processor as well as resetting anything that is used to track failure (as a chunk implies that it’s successful)

Parameters:

  • c (String)

    the chunk of data to receive



90
91
92
93
94
95
96
97
98
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 90

def receive_chunk(c)
  return unless @current_request.response_header.status == 200
  if !@failure_reason.nil?
    @failure_reason = nil
    @failure_delay  = nil
    @failure_count  = 0
  end
  stream_processor.receive_chunk(c)
end

#request_methodObject

Returns the correct http method to be used for the current path.



127
128
129
130
131
132
133
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 127

def request_method
  {:filter   => :post,
   :sample   => :get,
   :firehose => :get,
   :retweet  => :get
  }.fetch(@path, :get)
end

#stream_processorObject

Returns the current stream processor, creating a new one if it hasn’t been initialized yet.



81
82
83
# File 'lib/bird_grinder/tweeter/streaming_request.rb', line 81

def stream_processor
  @stream_processor ||= StreamProcessor.new(@parent, @name, @metadata)
end