Class: Twitter::JSONStream

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/twitter/json_stream.rb

Constant Summary collapse

MAX_LINE_LENGTH =
1024*1024
NF_RECONNECT_START =

network failure reconnections

0.25
NF_RECONNECT_ADD =
0.25
NF_RECONNECT_MAX =
16
AF_RECONNECT_START =

app failure reconnections

10
AF_RECONNECT_MUL =
2
RECONNECT_MAX =
320
RETRIES_MAX =
10
NO_DATA_TIMEOUT =
90
DEFAULT_OPTIONS =
{
  :method         => 'GET',
  :path           => '/',
  :content_type   => "application/x-www-form-urlencoded",
  :content        => '',
  :path           => '/1/statuses/filter.json',
  :host           => 'stream.twitter.com',
  :port           => 443,
  :ssl            => true,
  :user_agent     => 'TwitterStream',
  :timeout        => 0,
  :proxy          => ENV['HTTP_PROXY'],
  :auth           => nil,
  :oauth          => {},
  :filters        => [],
  :params         => {},
  :auto_reconnect => true
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ JSONStream

Returns a new instance of JSONStream.



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/twitter/json_stream.rb', line 69

def initialize options = {}
  @options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly
  @gracefully_closed = false
  @nf_last_reconnect = nil
  @af_last_reconnect = nil
  @reconnect_retries = 0
  @immediate_reconnect = false
  @on_inited_callback = options.delete(:on_inited)
  @proxy = URI.parse(options[:proxy]) if options[:proxy]
  @last_data_received_at = nil
end

Instance Attribute Details

#af_last_reconnectObject

Returns the value of attribute af_last_reconnect.



47
48
49
# File 'lib/twitter/json_stream.rb', line 47

def af_last_reconnect
  @af_last_reconnect
end

#codeObject

Returns the value of attribute code.



44
45
46
# File 'lib/twitter/json_stream.rb', line 44

def code
  @code
end

#headersObject

Returns the value of attribute headers.



45
46
47
# File 'lib/twitter/json_stream.rb', line 45

def headers
  @headers
end

#last_data_received_atObject

Returns the value of attribute last_data_received_at.



49
50
51
# File 'lib/twitter/json_stream.rb', line 49

def last_data_received_at
  @last_data_received_at
end

#nf_last_reconnectObject

Returns the value of attribute nf_last_reconnect.



46
47
48
# File 'lib/twitter/json_stream.rb', line 46

def nf_last_reconnect
  @nf_last_reconnect
end

#proxyObject

Returns the value of attribute proxy.



50
51
52
# File 'lib/twitter/json_stream.rb', line 50

def proxy
  @proxy
end

#reconnect_retriesObject

Returns the value of attribute reconnect_retries.



48
49
50
# File 'lib/twitter/json_stream.rb', line 48

def reconnect_retries
  @reconnect_retries
end

Class Method Details

.connect(options = {}) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/twitter/json_stream.rb', line 52

def self.connect options = {}
  options[:port] = 443 if options[:ssl] && !options.has_key?(:port)
  options = DEFAULT_OPTIONS.merge(options)

  host = options[:host]
  port = options[:port]

  if options[:proxy]
    proxy_uri = URI.parse(options[:proxy])
    host = proxy_uri.host
    port = proxy_uri.port
  end

  connection = EventMachine.connect host, port, self, options
  connection
end

Instance Method Details

#connection_completedObject



134
135
136
137
# File 'lib/twitter/json_stream.rb', line 134

def connection_completed
  start_tls if @options[:ssl]
  send_request
end

#each_item(&block) ⇒ Object



81
82
83
# File 'lib/twitter/json_stream.rb', line 81

def each_item &block
  @each_item_callback = block
end

#immediate_reconnectObject



112
113
114
115
116
# File 'lib/twitter/json_stream.rb', line 112

def immediate_reconnect
  @immediate_reconnect = true
  @gracefully_closed = false
  close_connection
end

#on_close(&block) ⇒ Object



103
104
105
# File 'lib/twitter/json_stream.rb', line 103

def on_close &block
  @close_callback = block
end

#on_error(&block) ⇒ Object



85
86
87
# File 'lib/twitter/json_stream.rb', line 85

def on_error &block
  @error_callback = block
end

#on_max_reconnects(&block) ⇒ Object



99
100
101
# File 'lib/twitter/json_stream.rb', line 99

def on_max_reconnects &block
  @max_reconnects_callback = block
end

#on_no_data(&block) ⇒ Object

Called when no data has been received for NO_DATA_TIMEOUT seconds. Reconnecting is probably in order as per the Twitter recommendations



95
96
97
# File 'lib/twitter/json_stream.rb', line 95

def on_no_data &block
  @no_data_callback = block
end

#on_reconnect(&block) ⇒ Object



89
90
91
# File 'lib/twitter/json_stream.rb', line 89

def on_reconnect &block
  @reconnect_callback = block
end

#post_initObject



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/twitter/json_stream.rb', line 139

def post_init
  reset_state
  @on_inited_callback.call if @on_inited_callback
  @reconnect_timer = EventMachine.add_periodic_timer(5) do
    if @gracefully_closed
      @reconnect_timer.cancel
    elsif @last_data_received_at && Time.now - @last_data_received_at > NO_DATA_TIMEOUT
      no_data
    end
  end
end

#receive_data(data) ⇒ Object

Receives raw data from the HTTP connection and pushes it into the HTTP parser which then drives subsequent callbacks.



129
130
131
132
# File 'lib/twitter/json_stream.rb', line 129

def receive_data(data)
  @last_data_received_at = Time.now
  @parser << data
end

#stopObject



107
108
109
110
# File 'lib/twitter/json_stream.rb', line 107

def stop
  @gracefully_closed = true
  close_connection
end

#unbindObject



118
119
120
121
122
123
124
125
# File 'lib/twitter/json_stream.rb', line 118

def unbind
  if @state == :stream && !@buffer.empty?
    parse_stream_line(@buffer.flush)
  end
  schedule_reconnect if @options[:auto_reconnect] && !@gracefully_closed
  @close_callback.call if @close_callback
  @state = :init
end