Class: Twitter::JSONStream

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/twitter/json_stream.rb

Constant Summary collapse

MAX_LINE_LENGTH =
1024*1024
NF_RECONNECT_START =

network failure reconnections

0.25
NF_RECONNECT_ADD =
0.25
NF_RECONNECT_MAX =
16
AF_RECONNECT_START =

app failure reconnections

10
AF_RECONNECT_MUL =
2
RECONNECT_MAX =
320
RETRIES_MAX =
10
DEFAULT_OPTIONS =
{
  :method       => 'GET',
  :path         => '/',
  :content_type => "application/x-www-form-urlencoded",
  :content      => '',
  :path         => '/1/statuses/filter.json',
  :host         => 'stream.twitter.com',
  :port         => 80,
  :ssl          => false,
  :user_agent   => 'TwitterStream',
  :timeout      => 0,
  :proxy        => ENV['HTTP_PROXY'],
  :auth         => nil,
  :oauth        => {},
  :filters      => [],
  :params       => {},
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ JSONStream

Returns a new instance of JSONStream.



65
66
67
68
69
70
71
72
73
74
# File 'lib/twitter/json_stream.rb', line 65

def initialize options = {}
  @options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly
  @gracefully_closed = false
  @nf_last_reconnect = nil
  @af_last_reconnect = nil
  @reconnect_retries = 0
  @immediate_reconnect = false
  @on_inited_callback = options.delete(:on_inited)
  @proxy = URI.parse(options[:proxy]) if options[:proxy]
end

Instance Attribute Details

#af_last_reconnectObject

Returns the value of attribute af_last_reconnect.



43
44
45
# File 'lib/twitter/json_stream.rb', line 43

def af_last_reconnect
  @af_last_reconnect
end

#codeObject

Returns the value of attribute code.



40
41
42
# File 'lib/twitter/json_stream.rb', line 40

def code
  @code
end

#headersObject

Returns the value of attribute headers.



41
42
43
# File 'lib/twitter/json_stream.rb', line 41

def headers
  @headers
end

#nf_last_reconnectObject

Returns the value of attribute nf_last_reconnect.



42
43
44
# File 'lib/twitter/json_stream.rb', line 42

def nf_last_reconnect
  @nf_last_reconnect
end

#proxyObject

Returns the value of attribute proxy.



45
46
47
# File 'lib/twitter/json_stream.rb', line 45

def proxy
  @proxy
end

#reconnect_retriesObject

Returns the value of attribute reconnect_retries.



44
45
46
# File 'lib/twitter/json_stream.rb', line 44

def reconnect_retries
  @reconnect_retries
end

Class Method Details

.connect(options = {}) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/twitter/json_stream.rb', line 47

def self.connect options = {}
  options[:port] = 443 if options[:ssl] && !options.has_key?(:port)
  options = DEFAULT_OPTIONS.merge(options)

  host = options[:host]
  port = options[:port]

  if options[:proxy]
    proxy_uri = URI.parse(options[:proxy])
    host = proxy_uri.host
    port = proxy_uri.port
  end

  connection = EventMachine.connect host, port, self, options
  connection.start_tls if options[:ssl]
  connection
end

Instance Method Details

#connection_completedObject



120
121
122
# File 'lib/twitter/json_stream.rb', line 120

def connection_completed
  send_request
end

#each_item(&block) ⇒ Object



76
77
78
# File 'lib/twitter/json_stream.rb', line 76

def each_item &block
  @each_item_callback = block
end

#immediate_reconnectObject



97
98
99
100
101
# File 'lib/twitter/json_stream.rb', line 97

def immediate_reconnect
  @immediate_reconnect = true
  @gracefully_closed = false
  close_connection
end

#on_error(&block) ⇒ Object



80
81
82
# File 'lib/twitter/json_stream.rb', line 80

def on_error &block
  @error_callback = block
end

#on_max_reconnects(&block) ⇒ Object



88
89
90
# File 'lib/twitter/json_stream.rb', line 88

def on_max_reconnects &block
  @max_reconnects_callback = block
end

#on_reconnect(&block) ⇒ Object



84
85
86
# File 'lib/twitter/json_stream.rb', line 84

def on_reconnect &block
  @reconnect_callback = block
end

#post_initObject



124
125
126
127
# File 'lib/twitter/json_stream.rb', line 124

def post_init
  reset_state
  @on_inited_callback.call if @on_inited_callback
end

#receive_data(data) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/twitter/json_stream.rb', line 108

def receive_data data
  begin
    @buffer.extract(data).each do |line|
      receive_line(line)
    end
  rescue Exception => e
    receive_error("#{e.class}: " + [e.message, e.backtrace].flatten.join("\n\t"))
    close_connection
    return
  end
end

#stopObject



92
93
94
95
# File 'lib/twitter/json_stream.rb', line 92

def stop
  @gracefully_closed = true
  close_connection
end

#unbindObject



103
104
105
106
# File 'lib/twitter/json_stream.rb', line 103

def unbind
  receive_line(@buffer.flush) unless @buffer.empty?
  schedule_reconnect unless @gracefully_closed
end