Class: TwitterStream::JSONStream

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/twitter_stream/json_stream.rb

Constant Summary collapse

MAX_LINE_LENGTH =
1024*1024
NF_RECONNECT_START =

network failure reconnections

0.25
NF_RECONNECT_ADD =
0.25
NF_RECONNECT_MAX =
16
AF_RECONNECT_START =

app failure reconnections

10
AF_RECONNECT_MUL =
2
RECONNECT_MAX =
320
RETRIES_MAX =
10
DEFAULT_OPTIONS =
{
  :method       => 'GET',
  :path         => '/',
  :content_type => "application/x-www-form-urlencoded",
  :content      => '',
  :path         => '/1/statuses/filter.json',
  :host         => 'stream.twitter.com',
  :port         => 80,
  :ssl          => false,
  :user_agent   => 'TwitterStream',
  :timeout      => 0,
  :proxy        => ENV['HTTP_PROXY'],
  :auth         => nil,
  :oauth        => {},
  :filters      => []
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ JSONStream

Returns a new instance of JSONStream.



64
65
66
67
68
69
70
71
72
# File 'lib/twitter_stream/json_stream.rb', line 64

def initialize options = {}
  @options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly
  @gracefully_closed = false
  @nf_last_reconnect = nil
  @af_last_reconnect = nil
  @reconnect_retries = 0
  @immediate_reconnect = false
  @proxy = URI.parse(options[:proxy]) if options[:proxy]
end

Instance Attribute Details

#af_last_reconnectObject

Returns the value of attribute af_last_reconnect.



42
43
44
# File 'lib/twitter_stream/json_stream.rb', line 42

def af_last_reconnect
  @af_last_reconnect
end

#codeObject

Returns the value of attribute code.



39
40
41
# File 'lib/twitter_stream/json_stream.rb', line 39

def code
  @code
end

#headersObject

Returns the value of attribute headers.



40
41
42
# File 'lib/twitter_stream/json_stream.rb', line 40

def headers
  @headers
end

#nf_last_reconnectObject

Returns the value of attribute nf_last_reconnect.



41
42
43
# File 'lib/twitter_stream/json_stream.rb', line 41

def nf_last_reconnect
  @nf_last_reconnect
end

#proxyObject

Returns the value of attribute proxy.



44
45
46
# File 'lib/twitter_stream/json_stream.rb', line 44

def proxy
  @proxy
end

#reconnect_retriesObject

Returns the value of attribute reconnect_retries.



43
44
45
# File 'lib/twitter_stream/json_stream.rb', line 43

def reconnect_retries
  @reconnect_retries
end

Class Method Details

.connect(options = {}) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/twitter_stream/json_stream.rb', line 46

def self.connect options = {}
  options[:port] = 443 if options[:ssl] && !options.has_key?(:port)
  options = DEFAULT_OPTIONS.merge(options)

  host = options[:host]
  port = options[:port]

  if options[:proxy]
    proxy_uri = URI.parse(options[:proxy])
    host = proxy_uri.host
    port = proxy_uri.port
  end
  
  connection = EventMachine.connect host, port, self, options
  connection.start_tls if options[:ssl]
  connection
end

Instance Method Details

#connection_completedObject



118
119
120
121
# File 'lib/twitter_stream/json_stream.rb', line 118

def connection_completed
  reset_state
  send_request
end

#each_item(&block) ⇒ Object



74
75
76
# File 'lib/twitter_stream/json_stream.rb', line 74

def each_item &block
  @each_item_callback = block
end

#immediate_reconnectObject



95
96
97
98
99
# File 'lib/twitter_stream/json_stream.rb', line 95

def immediate_reconnect
  @immediate_reconnect = true
  @gracefully_closed = false
  close_connection
end

#on_error(&block) ⇒ Object



78
79
80
# File 'lib/twitter_stream/json_stream.rb', line 78

def on_error &block
  @error_callback = block
end

#on_max_reconnects(&block) ⇒ Object



86
87
88
# File 'lib/twitter_stream/json_stream.rb', line 86

def on_max_reconnects &block
  @max_reconnects_callback = block
end

#on_reconnect(&block) ⇒ Object



82
83
84
# File 'lib/twitter_stream/json_stream.rb', line 82

def on_reconnect &block
  @reconnect_callback = block
end

#receive_data(data) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/twitter_stream/json_stream.rb', line 106

def receive_data data
  begin
    @buffer.extract(data).each do |line|
      receive_line(line)
    end
  rescue Exception => e
    receive_error("#{e.class}: " + [e.message, e.backtrace].flatten.join("\n\t"))
    close_connection
    return
  end
end

#stopObject



90
91
92
93
# File 'lib/twitter_stream/json_stream.rb', line 90

def stop
  @gracefully_closed = true
  close_connection
end

#unbindObject



101
102
103
104
# File 'lib/twitter_stream/json_stream.rb', line 101

def unbind
  receive_line(@buffer.flush) unless @buffer.empty?
  schedule_reconnect unless @gracefully_closed
end