Class: TweetStream::Client
- Inherits:
-
Object
- Object
- TweetStream::Client
- Defined in:
- lib/tweetstream/client.rb
Overview
Provides simple access to the Twitter Streaming API (https://dev.twitter.com/docs/streaming-api) for Ruby scripts that need to create a long connection to Twitter for tracking and other purposes.
Basic usage of the library is to call one of the provided methods and provide a block that will perform actions on a yielded Twitter::Tweet. For example:
TweetStream::Client.new.track('fail') do |status|
puts "[#{status.user.screen_name}] #{status.text}"
end
For information about a daemonized TweetStream client, view the TweetStream::Daemon class.
Direct Known Subclasses
Constant Summary collapse
- OPTION_CALLBACKS =
[:delete, :scrub_geo, :limit, :error, :enhance_your_calm, :unauthorized, :reconnect, :inited, :direct_message, :timeline_status, :anything, :no_data_received, :status_withheld, :user_withheld].freeze
Instance Attribute Summary collapse
-
#control ⇒ Object
readonly
Returns the value of attribute control.
-
#control_uri ⇒ Object
readonly
Returns the value of attribute control_uri.
-
#options ⇒ Object
Returns the value of attribute options.
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
Instance Method Summary collapse
-
#close_connection ⇒ Object
Close the connection to twitter without closing the eventmachine loop.
-
#connect(path, options = {}, &block) ⇒ Object
connect to twitter without starting a new EventMachine run loop.
-
#filter(query_params = {}, &block) ⇒ Object
Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords.
-
#firehose(query_parameters = {}, &block) ⇒ Object
Returns all public statuses.
-
#follow(*user_ids, &block) ⇒ Object
Returns public statuses from or in reply to a set of users.
-
#initialize(options = {}) ⇒ Client
constructor
Creates a new API.
-
#links(query_parameters = {}, &block) ⇒ Object
Returns all statuses containing http: and https:.
-
#locations(*locations_map, &block) ⇒ Object
Specifies a set of bounding boxes to track.
- #on(event, &block) ⇒ Object
-
#on_anything(&block) ⇒ Object
Set a Proc to be run whenever anything is encountered in the processing of the stream.
-
#on_delete(&block) ⇒ Object
Set a Proc to be run when a deletion notice is received from the Twitter stream.
-
#on_direct_message(&block) ⇒ Object
Set a Proc to be run when a direct message is encountered in the processing of the stream.
-
#on_enhance_your_calm(&block) ⇒ Object
Set a Proc to be run when enhance_your_calm signal is received.
-
#on_error(&block) ⇒ Object
Set a Proc to be run when an HTTP error is encountered in the processing of the stream.
-
#on_event(event, &block) ⇒ Object
Set a Proc to be run on userstream events.
-
#on_friends(&block) ⇒ Object
Set a Proc to be run when a Site Stream friends list is received.
-
#on_inited(&block) ⇒ Object
Set a Proc to be run when connection established.
-
#on_limit(&block) ⇒ Object
Set a Proc to be run when a rate limit notice is received from the Twitter stream.
-
#on_no_data_received(&block) ⇒ Object
Set a Proc to be run when no data is received from the server and a stall occurs.
-
#on_reconnect(&block) ⇒ Object
Set a Proc to be run on reconnect.
-
#on_scrub_geo(&block) ⇒ Object
Set a Proc to be run when a scrub_geo notice is received from the Twitter stream.
-
#on_stall_warning(&block) ⇒ Object
Set a Proc to be run when a stall warning is received.
-
#on_status_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
-
#on_timeline_status(&block) ⇒ Object
Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.
-
#on_unauthorized(&block) ⇒ Object
Set a Proc to be run when an HTTP status 401 is encountered while connecting to Twitter.
-
#on_user_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
- #respond_to(hash, callbacks, &block) ⇒ Object
-
#retweet(query_parameters = {}, &block) ⇒ Object
Returns all retweets.
-
#sample(query_parameters = {}, &block) ⇒ Object
Returns a random sample of all public statuses.
-
#sitestream(user_ids = [], query_params = {}, &block) ⇒ Object
Make a call to the userstream api.
-
#start(path, query_parameters = {}, &block) ⇒ Object
connect to twitter while starting a new EventMachine run loop.
-
#stop ⇒ Object
Terminate the currently running TweetStream and close EventMachine loop.
- #stop_stream ⇒ Object
-
#track(*keywords, &block) ⇒ Object
Specify keywords to track.
-
#userstream(query_params = {}, &block) ⇒ Object
Make a call to the userstream api for currently authenticated user.
Constructor Details
#initialize(options = {}) ⇒ Client
Creates a new API
44 45 46 47 48 49 50 51 |
# File 'lib/tweetstream/client.rb', line 44 def initialize(={}) self. = = TweetStream..merge() Configuration::VALID_OPTIONS_KEYS.each do |key| send("#{key}=", [key]) end @callbacks = {} end |
Instance Attribute Details
#control ⇒ Object (readonly)
Returns the value of attribute control.
41 42 43 |
# File 'lib/tweetstream/client.rb', line 41 def control @control end |
#control_uri ⇒ Object (readonly)
Returns the value of attribute control_uri.
41 42 43 |
# File 'lib/tweetstream/client.rb', line 41 def control_uri @control_uri end |
#options ⇒ Object
Returns the value of attribute options.
40 41 42 |
# File 'lib/tweetstream/client.rb', line 40 def @options end |
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
41 42 43 |
# File 'lib/tweetstream/client.rb', line 41 def stream @stream end |
Instance Method Details
#close_connection ⇒ Object
Close the connection to twitter without closing the eventmachine loop
487 488 489 |
# File 'lib/tweetstream/client.rb', line 487 def close_connection @stream.close_connection if @stream end |
#connect(path, options = {}, &block) ⇒ Object
connect to twitter without starting a new EventMachine run loop
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/tweetstream/client.rb', line 395 def connect(path, = {}, &block) stream_parameters, callbacks = (path, ) @stream = EM::Twitter::Client.connect(stream_parameters) @stream.each do |item| begin hash = Yajl::Parser.parse(item, :symbolize_keys => true) rescue Yajl::ParseError invoke_callback(callbacks['error'], "Yajl::ParseError occured in stream: #{item}") next end unless hash.is_a?(::Hash) invoke_callback(callbacks['error'], "Unexpected JSON object in stream: #{item}") next end Twitter.identity_map = false respond_to(hash, callbacks, &block) (callbacks['anything'], hash) end @stream.on_error do || invoke_callback(callbacks['error'], ) end @stream. do invoke_callback(callbacks['unauthorized']) end @stream.on_enhance_your_calm do invoke_callback(callbacks['enhance_your_calm']) end @stream.on_reconnect do |timeout, retries| invoke_callback(callbacks['reconnect'], timeout, retries) end @stream.on_max_reconnects do |timeout, retries| raise TweetStream::ReconnectError.new(timeout, retries) end @stream.on_no_data_received do invoke_callback(callbacks['no_data_received']) end @stream end |
#filter(query_params = {}, &block) ⇒ Object
Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords. This method is provided separately for cases when it would conserve the number of HTTP connections to combine track and follow.
130 131 132 |
# File 'lib/tweetstream/client.rb', line 130 def filter(query_params = {}, &block) start('/1.1/statuses/filter.json', query_params.merge(:method => :post), &block) end |
#firehose(query_parameters = {}, &block) ⇒ Object
Returns all public statuses. The Firehose is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.
57 58 59 |
# File 'lib/tweetstream/client.rb', line 57 def firehose(query_parameters = {}, &block) start('/1.1/statuses/firehose.json', query_parameters, &block) end |
#follow(*user_ids, &block) ⇒ Object
Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies ("@user Hello!" created without pressing the reply "swoosh") are not matched. Requires integer user IDs, not screen names. Query parameters may be passed as the last argument.
105 106 107 108 109 |
# File 'lib/tweetstream/client.rb', line 105 def follow(*user_ids, &block) query_params = user_ids.pop if user_ids.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:follow => user_ids), &block) end |
#links(query_parameters = {}, &block) ⇒ Object
Returns all statuses containing http: and https:. The links stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.
65 66 67 |
# File 'lib/tweetstream/client.rb', line 65 def links(query_parameters = {}, &block) start('/1.1/statuses/links.json', query_parameters, &block) end |
#locations(*locations_map, &block) ⇒ Object
Specifies a set of bounding boxes to track. Only tweets that are both created using the Geotagging API and are placed from within a tracked bounding box will be included in the stream – the user’s location field is not used to filter tweets (e.g. if a user has their location set to “San Francisco”, but the tweet was not created using the Geotagging API and has no geo element, it will not be included in the stream). Bounding boxes are specified as a comma separate list of longitude/latitude pairs, with the first pair denoting the southwest corner of the box longitude/latitude pairs, separated by commas. The first pair specifies the southwest corner of the box.
119 120 121 122 123 |
# File 'lib/tweetstream/client.rb', line 119 def locations(*locations_map, &block) query_params = locations_map.pop if locations_map.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:locations => locations_map), &block) end |
#on(event, &block) ⇒ Object
371 372 373 374 375 376 377 378 |
# File 'lib/tweetstream/client.rb', line 371 def on(event, &block) if block_given? @callbacks[event.to_s] = block self else @callbacks[event.to_s] end end |
#on_anything(&block) ⇒ Object
Set a Proc to be run whenever anything is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client.on_anything do |status|
# do something with the status
end
Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
257 258 259 |
# File 'lib/tweetstream/client.rb', line 257 def on_anything(&block) on('anything', &block) end |
#on_delete(&block) ⇒ Object
Set a Proc to be run when a deletion notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_delete do |status_id, user_id|
Tweet.delete(status_id)
end
Block must take two arguments: the status id and the user id. If no block is given, it will return the currently set deletion proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
165 166 167 |
# File 'lib/tweetstream/client.rb', line 165 def on_delete(&block) on('delete', &block) end |
#on_direct_message(&block) ⇒ Object
Set a Proc to be run when a direct message is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client. do ||
# do something with the direct message
end
Block must take one argument: the direct message. If no block is given, it will return the currently set direct message proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
241 242 243 |
# File 'lib/tweetstream/client.rb', line 241 def (&block) on('direct_message', &block) end |
#on_enhance_your_calm(&block) ⇒ Object
Set a Proc to be run when enhance_your_calm signal is received.
@client = TweetStream::Client.new
@client.on_enhance_your_calm do
# do something, your account has been blocked
end
317 318 319 |
# File 'lib/tweetstream/client.rb', line 317 def on_enhance_your_calm(&block) on('enhance_your_calm', &block) end |
#on_error(&block) ⇒ Object
Set a Proc to be run when an HTTP error is encountered in the processing of the stream. Note that TweetStream will automatically try to reconnect, this is for reference only. Don't panic!
@client = TweetStream::Client.new
@client.on_error do ||
# Make note of error message
end
Block must take one argument: the error message. If no block is given, it will return the currently set error proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
214 215 216 |
# File 'lib/tweetstream/client.rb', line 214 def on_error(&block) on('error', &block) end |
#on_event(event, &block) ⇒ Object
Set a Proc to be run on userstream events
@client = TweetStream::Client.new
@client.event(:favorite) do |event|
# do something with the status
end
367 368 369 |
# File 'lib/tweetstream/client.rb', line 367 def on_event(event, &block) on(event, &block) end |
#on_friends(&block) ⇒ Object
Set a Proc to be run when a Site Stream friends list is received.
@client = TweetStream::Client.new
@client.on_friends do |friends|
# do something with the friends list
end
347 348 349 |
# File 'lib/tweetstream/client.rb', line 347 def on_friends(&block) on('friends', &block) end |
#on_inited(&block) ⇒ Object
Set a Proc to be run when connection established. Called in EventMachine::Connection#post_init
@client = TweetStream::Client.new
@client.on_inited do
puts 'Connected...'
end
296 297 298 |
# File 'lib/tweetstream/client.rb', line 296 def on_inited(&block) on('inited', &block) end |
#on_limit(&block) ⇒ Object
Set a Proc to be run when a rate limit notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_limit do |discarded_count|
# Make note of discarded count
end
Block must take one argument: the number of discarded tweets. If no block is given, it will return the currently set limit proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
197 198 199 |
# File 'lib/tweetstream/client.rb', line 197 def on_limit(&block) on('limit', &block) end |
#on_no_data_received(&block) ⇒ Object
Set a Proc to be run when no data is received from the server and a stall occurs. Twitter defines this to be 90 seconds.
@client = TweetStream::Client.new
@client.on_no_data_received do
# Make note of no data, possi
end
307 308 309 |
# File 'lib/tweetstream/client.rb', line 307 def on_no_data_received(&block) on('no_data_received', &block) end |
#on_reconnect(&block) ⇒ Object
Set a Proc to be run on reconnect.
@client = TweetStream::Client.new
@client.on_reconnect do |timeout, retries|
# Make note of the reconnection
end
284 285 286 |
# File 'lib/tweetstream/client.rb', line 284 def on_reconnect(&block) on('reconnect', &block) end |
#on_scrub_geo(&block) ⇒ Object
Set a Proc to be run when a scrub_geo notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_scrub_geo do |up_to_status_id, user_id|
Tweet.where(:status_id <= up_to_status_id)
end
Block must take two arguments: the upper status id and the user id. If no block is given, it will return the currently set scrub_geo proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
181 182 183 |
# File 'lib/tweetstream/client.rb', line 181 def on_scrub_geo(&block) on('scrub_geo', &block) end |
#on_stall_warning(&block) ⇒ Object
Set a Proc to be run when a stall warning is received.
@client = TweetStream::Client.new
@client.on_stall_warning do |warning|
# do something with the friends list
end
357 358 359 |
# File 'lib/tweetstream/client.rb', line 357 def on_stall_warning(&block) on('stall_warning', &block) end |
#on_status_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
@client = TweetStream::Client.new
@client.on_status_withheld do |status|
# do something with the status
end
327 328 329 |
# File 'lib/tweetstream/client.rb', line 327 def on_status_withheld(&block) on('status_withheld', &block) end |
#on_timeline_status(&block) ⇒ Object
Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client.on_timeline_status do |status|
# do something with the status
end
Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
273 274 275 |
# File 'lib/tweetstream/client.rb', line 273 def on_timeline_status(&block) on('timeline_status', &block) end |
#on_unauthorized(&block) ⇒ Object
Set a Proc to be run when an HTTP status 401 is encountered while connecting to Twitter. This could happen when system clock drift has occured.
If no block is given, it will return the currently set unauthorized proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
225 226 227 |
# File 'lib/tweetstream/client.rb', line 225 def (&block) on('unauthorized', &block) end |
#on_user_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
@client = TweetStream::Client.new
@client.on_user_withheld do |status|
# do something with the status
end
337 338 339 |
# File 'lib/tweetstream/client.rb', line 337 def on_user_withheld(&block) on('user_withheld', &block) end |
#respond_to(hash, callbacks, &block) ⇒ Object
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
# File 'lib/tweetstream/client.rb', line 446 def respond_to(hash, callbacks, &block) if hash[:control] && hash[:control][:control_uri] @control_uri = hash[:control][:control_uri] require 'tweetstream/site_stream_client' @control = TweetStream::SiteStreamClient.new(@control_uri, ) @control.on_error(&callbacks['error']) elsif hash[:warning] invoke_callback(callbacks['stall_warning'], hash[:warning]) elsif hash[:delete] && hash[:delete][:status] invoke_callback(callbacks['delete'], hash[:delete][:status][:id], hash[:delete][:status][:user_id]) elsif hash[:scrub_geo] && hash[:scrub_geo][:up_to_status_id] invoke_callback(callbacks['scrub_geo'], hash[:scrub_geo][:up_to_status_id], hash[:scrub_geo][:user_id]) elsif hash[:limit] && hash[:limit][:track] invoke_callback(callbacks['limit'], hash[:limit][:track]) elsif hash[:direct_message] (callbacks['direct_message'], Twitter::DirectMessage.new(hash[:direct_message])) elsif hash[:status_withheld] invoke_callback(callbacks['status_withheld'], hash[:status_withheld]) elsif hash[:user_withheld] invoke_callback(callbacks['user_withheld'], hash[:user_withheld]) elsif hash[:event] invoke_callback(callbacks[hash[:event].to_s], hash) elsif hash[:friends] invoke_callback(callbacks['friends'], hash[:friends]) elsif hash[:text] && hash[:user] @last_status = Twitter::Tweet.new(hash) (callbacks['timeline_status'], @last_status) (block, @last_status) if block_given? elsif hash[:for_user] (block, hash) if block_given? end end |
#retweet(query_parameters = {}, &block) ⇒ Object
Returns all retweets. The retweet stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case. As of 9/11/2009, the site-wide retweet feature has not yet launched, so there are currently few, if any, retweets on this stream.
75 76 77 |
# File 'lib/tweetstream/client.rb', line 75 def retweet(query_parameters = {}, &block) start('/1.1/statuses/retweet.json', query_parameters, &block) end |
#sample(query_parameters = {}, &block) ⇒ Object
Returns a random sample of all public statuses. The default access level provides a small proportion of the Firehose. The "Gardenhose" access level provides a proportion more suitable for data mining and research applications that desire a larger proportion to be statistically significant sample.
84 85 86 |
# File 'lib/tweetstream/client.rb', line 84 def sample(query_parameters = {}, &block) start('/1.1/statuses/sample.json', query_parameters, &block) end |
#sitestream(user_ids = [], query_params = {}, &block) ⇒ Object
Make a call to the userstream api
142 143 144 145 146 147 148 149 150 151 |
# File 'lib/tweetstream/client.rb', line 142 def sitestream(user_ids = [], query_params = {}, &block) stream_params = { :host => "sitestream.twitter.com" } query_params.merge!({ :method => :post, :follow => user_ids, :extra_stream_parameters => stream_params }) query_params.merge!(:with => 'followings') if query_params.delete(:followings) start('/1.1/site.json', query_params, &block) end |
#start(path, query_parameters = {}, &block) ⇒ Object
connect to twitter while starting a new EventMachine run loop
381 382 383 384 385 386 387 388 389 390 391 392 |
# File 'lib/tweetstream/client.rb', line 381 def start(path, query_parameters = {}, &block) if EventMachine.reactor_running? connect(path, query_parameters, &block) else EventMachine.epoll EventMachine.kqueue EventMachine::run do connect(path, query_parameters, &block) end end end |
#stop ⇒ Object
Terminate the currently running TweetStream and close EventMachine loop
481 482 483 484 |
# File 'lib/tweetstream/client.rb', line 481 def stop EventMachine.stop_event_loop @last_status end |
#stop_stream ⇒ Object
491 492 493 |
# File 'lib/tweetstream/client.rb', line 491 def stop_stream @stream.stop if @stream end |
#track(*keywords, &block) ⇒ Object
Specify keywords to track. Queries are subject to Track Limitations, described in Track Limiting and subject to access roles, described in the statuses/filter method. Track keywords are case-insensitive logical ORs. Terms are exact-matched, and also exact-matched ignoring punctuation. Phrases, keywords with spaces, are not supported. Keywords containing punctuation will only exact match tokens. Query parameters may be passed as the last argument.
95 96 97 98 99 |
# File 'lib/tweetstream/client.rb', line 95 def track(*keywords, &block) query_params = keywords.pop if keywords.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:track => keywords), &block) end |
#userstream(query_params = {}, &block) ⇒ Object
Make a call to the userstream api for currently authenticated user
135 136 137 138 139 |
# File 'lib/tweetstream/client.rb', line 135 def userstream(query_params = {}, &block) stream_params = { :host => "userstream.twitter.com" } query_params.merge!(:extra_stream_parameters => stream_params) start('/1.1/user.json', query_params, &block) end |