Class: TweetStream::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/tweetstream/client.rb

Overview

Provides simple access to the Twitter Streaming API (apiwiki.twitter.com/Streaming-API-Documentation) for Ruby scripts that need to create a long connection to Twitter for tracking and other purposes.

Basic usage of the library is to call one of the provided methods and provide a block that will perform actions on a yielded TweetStream::Status. For example:

TweetStream::Client.new('user','pass').track('fail') do |status|
  puts "[#{status.user.screen_name}] #{status.text}"
end

For information about a daemonized TweetStream client, view the TweetStream::Daemon class.

Direct Known Subclasses

Daemon

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(consumer_key, consumer_secret, access_key, access_secret, parser = :json_gem) ⇒ Client

Create a new client with the Twitter credentials of the account you want to be using its API quota. You may also set the JSON parsing library as specified in the #parser= setter.



43
44
45
46
47
48
49
# File 'lib/tweetstream/client.rb', line 43

def initialize(consumer_key, consumer_secret, access_key, access_secret, parser = :json_gem)
  self.consumer_key = consumer_key
  self.consumer_secret = consumer_secret
  self.access_key = access_key
  self.access_secret = access_secret
  self.parser = parser
end

Instance Attribute Details

#access_keyObject

Returns the value of attribute access_key.



23
24
25
# File 'lib/tweetstream/client.rb', line 23

def access_key
  @access_key
end

#access_secretObject

Returns the value of attribute access_secret.



23
24
25
# File 'lib/tweetstream/client.rb', line 23

def access_secret
  @access_secret
end

#consumer_keyObject

Returns the value of attribute consumer_key.



23
24
25
# File 'lib/tweetstream/client.rb', line 23

def consumer_key
  @consumer_key
end

#consumer_secretObject

Returns the value of attribute consumer_secret.



23
24
25
# File 'lib/tweetstream/client.rb', line 23

def consumer_secret
  @consumer_secret
end

#parserObject

Returns the value of attribute parser.



24
25
26
# File 'lib/tweetstream/client.rb', line 24

def parser
  @parser
end

Instance Method Details

#filter(query_params = {}, &block) ⇒ Object

Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords. This method is provided separately for cases when it would conserve the number of HTTP connections to combine track and follow.



110
111
112
113
114
115
116
117
118
119
# File 'lib/tweetstream/client.rb', line 110

def filter(query_params = {}, &block)
  [:follow, :track, :locations].each do |param|
    if query_params[param].is_a?(Array)
      query_params[param] = query_params[param].collect{|q| q.to_s}.join(',')
    elsif query_params[param]
      query_params[param] = query_params[param].to_s
    end
  end
  start('statuses/filter', query_params.merge(:method => :post), &block)
end

#firehose(query_parameters = {}, &block) ⇒ Object

Returns all public statuses. The Firehose is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.



55
56
57
# File 'lib/tweetstream/client.rb', line 55

def firehose(query_parameters = {}, &block)
  start('statuses/firehose', query_parameters, &block)
end

#follow(*user_ids, &block) ⇒ Object

Returns public statuses from or in reply to a set of users. Mentions (“Hello @user!”) and implicit replies (“@user Hello!” created without pressing the reply “swoosh”) are not matched. Requires integer user IDs, not screen names. Query parameters may be passed as the last argument.



95
96
97
98
99
# File 'lib/tweetstream/client.rb', line 95

def follow(*user_ids, &block)
  query_params = user_ids.pop if user_ids.last.is_a?(::Hash)
  query_params ||= {}
  filter(query_params.merge(:follow => user_ids), &block)
end

#locations(coords, &block) ⇒ Object



101
102
103
# File 'lib/tweetstream/client.rb', line 101

def locations(coords, &block)
  filter(query_params.merge(:locations => coords), &block)
end

#on_delete(&block) ⇒ Object

Set a Proc to be run when a deletion notice is received from the Twitter stream. For example:

@client = TweetStream::Client.new('user','pass')
@client.on_delete do |status_id, user_id|
  Tweet.delete(status_id)
end

Block must take two arguments: the status id and the user id. If no block is given, it will return the currently set deletion proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



147
148
149
150
151
152
153
154
# File 'lib/tweetstream/client.rb', line 147

def on_delete(&block)
  if block_given?
    @on_delete = block
    self
  else
    @on_delete
  end
end

#on_error(&block) ⇒ Object

Set a Proc to be run when an HTTP error is encountered in the processing of the stream. Note that TweetStream will automatically try to reconnect, this is for reference only. Don’t panic!

@client = TweetStream::Client.new('user','pass')
@client.on_error do |message|
  # Make note of error message
end

Block must take one argument: the error message. If no block is given, it will return the currently set error proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



190
191
192
193
194
195
196
197
# File 'lib/tweetstream/client.rb', line 190

def on_error(&block)
  if block_given?
    @on_error = block
    self
  else
    @on_error
  end
end

#on_limit(&block) ⇒ Object

Set a Proc to be run when a rate limit notice is received from the Twitter stream. For example:

@client = TweetStream::Client.new('user','pass')
@client.on_limit do |discarded_count|
  # Make note of discarded count
end

Block must take one argument: the number of discarded tweets. If no block is given, it will return the currently set limit proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



168
169
170
171
172
173
174
175
# File 'lib/tweetstream/client.rb', line 168

def on_limit(&block)
  if block_given?
    @on_limit = block
    self
  else
    @on_limit
  end
end

#retweet(query_parameters = {}, &block) ⇒ Object

Returns all retweets. The retweet stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case. As of 9/11/2009, the site-wide retweet feature has not yet launched, so there are currently few, if any, retweets on this stream.



65
66
67
# File 'lib/tweetstream/client.rb', line 65

def retweet(query_parameters = {}, &block)
  start('statuses/retweet', query_parameters, &block)
end

#sample(query_parameters = {}, &block) ⇒ Object

Returns a random sample of all public statuses. The default access level provides a small proportion of the Firehose. The “Gardenhose” access level provides a proportion more suitable for data mining and research applications that desire a larger proportion to be statistically significant sample.



74
75
76
# File 'lib/tweetstream/client.rb', line 74

def sample(query_parameters = {}, &block)
  start('statuses/sample', query_parameters, &block)
end

#site_follow(*user_ids, &block) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/tweetstream/client.rb', line 121

def site_follow(*user_ids, &block)
  query_params ||= {:follow => user_ids}
  
  if query_params[:follow].is_a?(Array)
    query_params[:follow] = query_params[:follow].collect{|q| q.to_s}.join(',')
  elsif query_params[:follow]
    query_params[:follow] = query_params[:follow].to_s
  end
  query_params[:site_streams] = query_params[:follow]
  query_params[:track] = ["foo"]
  
  start('site', query_params.merge(:method => :post, :host => 'betastream.twitter.com', :version => '2b'), &block)
end

#start(path, query_parameters = {}, &block) ⇒ Object

:nodoc:



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/tweetstream/client.rb', line 199

def start(path, query_parameters = {}, &block) #:nodoc:

  host = query_parameters.delete(:host) || 'stream.twitter.com'
  version = query_parameters.delete(:version) || '1'

  method = query_parameters.delete(:method) || :get
  delete_proc = query_parameters.delete(:delete) || self.on_delete
  limit_proc = query_parameters.delete(:limit) || self.on_limit
  error_proc = query_parameters.delete(:error) || self.on_error
  
  uri = method == :get ? build_uri(path, version, query_parameters) : build_uri(path, version)
  
  oauth = {
    :consumer_key => self.consumer_key,
    :consumer_secret => self.consumer_secret,
    :access_key => self.access_key,
    :access_secret => self.access_secret
  }
  
  EventMachine::run {
    @stream = TwitterStream::JSONStream.connect(
      :path => uri,
      :host => host,
      :oauth => oauth,
      :filters => query_parameters[:track],
      :follow => query_parameters[:follow],
      :locations => query_parameters[:locations],
      :site_streams => query_parameters[:site_streams],
      :method => method.to_s.upcase,
      :content => (method == :post ? build_post_body(query_parameters) : ''),
      :user_agent => 'TweetStream'
    )
    
    @stream.each_item do |item|
      raw_hash = @parser.decode(item)
      
      unless raw_hash.is_a?(::Hash)
        error_proc.call("Unexpected JSON object in stream: #{item}")
        next
      end
      
      hash = TweetStream::Hash.new(raw_hash) # @parser.parse(item)
      
      if hash[:delete] && hash[:delete][:status]
        delete_proc.call(hash[:delete][:status][:id], hash[:delete][:status][:user_id]) if delete_proc.is_a?(Proc)
      elsif hash[:limit] && hash[:limit][:track]
        limit_proc.call(hash[:limit][:track]) if limit_proc.is_a?(Proc)
      elsif hash[:text] && hash[:user]
        @last_status = TweetStream::Status.new(hash)
        
        # Give the block the option to receive either one
        # or two arguments, depending on its arity.
        case block.arity
          when 1
            yield @last_status
          when 2
            yield @last_status, self
        end
      elsif hash[:for_user]
        
        @last_status = TweetStream::Status.new(hash[:messages] || hash[:message])
        
        # Give the block the option to receive either one
        # or two arguments, depending on its arity.
        case block.arity
          when 2
            yield @last_status, hash[:for_user]
          when 3
            yield @last_status, hash[:for_user], self
        end
      end
    end
    
    @stream.on_error do |message|
      error_proc.call(message) if error_proc.is_a?(Proc)
    end
    
    @stream.on_max_reconnects do |timeout, retries|
      raise TweetStream::ReconnectError.new(timeout, retries)
    end
  }
end

#stopObject

Terminate the currently running TweetStream.



283
284
285
286
# File 'lib/tweetstream/client.rb', line 283

def stop
  EventMachine.stop_event_loop
  @last_status
end

#track(*keywords, &block) ⇒ Object

Specify keywords to track. Queries are subject to Track Limitations, described in Track Limiting and subject to access roles, described in the statuses/filter method. Track keywords are case-insensitive logical ORs. Terms are exact-matched, and also exact-matched ignoring punctuation. Phrases, keywords with spaces, are not supported. Keywords containing punctuation will only exact match tokens. Query parameters may be passed as the last argument.



85
86
87
88
89
# File 'lib/tweetstream/client.rb', line 85

def track(*keywords, &block)
  query_params = keywords.pop if keywords.last.is_a?(::Hash)
  query_params ||= {}
  filter(query_params.merge(:track => keywords), &block)
end