Class: TweetStream::Client
- Inherits:
-
Object
- Object
- TweetStream::Client
- Defined in:
- lib/tweetstream/client.rb
Overview
Provides simple access to the Twitter Streaming API (https://dev.twitter.com/docs/streaming-api) for Ruby scripts that need to create a long connection to Twitter for tracking and other purposes.
Basic usage of the library is to call one of the provided methods and provide a block that will perform actions on a yielded Twitter::Tweet. For example:
TweetStream::Client.new.track('fail') do |status|
puts "[#{status.user.screen_name}] #{status.text}"
end
For information about a daemonized TweetStream client, view the TweetStream::Daemon class.
Direct Known Subclasses
Constant Summary collapse
- OPTION_CALLBACKS =
[:delete, :scrub_geo, :limit, :error, :enhance_your_calm, :unauthorized, :reconnect, :inited, :direct_message, :timeline_status, :anything, :no_data_received, :status_withheld, :user_withheld].freeze
Instance Attribute Summary collapse
-
#control ⇒ Object
readonly
Returns the value of attribute control.
-
#control_uri ⇒ Object
readonly
Returns the value of attribute control_uri.
-
#options ⇒ Object
Returns the value of attribute options.
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
Instance Method Summary collapse
-
#close_connection ⇒ Object
Close the connection to twitter without closing the eventmachine loop.
-
#connect(path, options = {}, &block) ⇒ Object
connect to twitter without starting a new EventMachine run loop.
-
#filter(query_params = {}, &block) ⇒ Object
Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords.
-
#firehose(query_parameters = {}, &block) ⇒ Object
Returns all public statuses.
-
#follow(*user_ids, &block) ⇒ Object
Returns public statuses from or in reply to a set of users.
-
#initialize(options = {}) ⇒ Client
constructor
Creates a new API.
-
#links(query_parameters = {}, &block) ⇒ Object
Returns all statuses containing http: and https:.
-
#locations(*locations_map, &block) ⇒ Object
Specifies a set of bounding boxes to track.
- #on(event, &block) ⇒ Object
-
#on_anything(&block) ⇒ Object
Set a Proc to be run whenever anything is encountered in the processing of the stream.
-
#on_delete(&block) ⇒ Object
Set a Proc to be run when a deletion notice is received from the Twitter stream.
-
#on_direct_message(&block) ⇒ Object
Set a Proc to be run when a direct message is encountered in the processing of the stream.
-
#on_enhance_your_calm(&block) ⇒ Object
Set a Proc to be run when enhance_your_calm signal is received.
-
#on_error(&block) ⇒ Object
Set a Proc to be run when an HTTP error is encountered in the processing of the stream.
-
#on_event(event, &block) ⇒ Object
Set a Proc to be run on userstream events.
-
#on_friends(&block) ⇒ Object
Set a Proc to be run when a Site Stream friends list is received.
-
#on_inited(&block) ⇒ Object
Set a Proc to be run when connection established.
-
#on_limit(&block) ⇒ Object
Set a Proc to be run when a rate limit notice is received from the Twitter stream.
-
#on_no_data_received(&block) ⇒ Object
Set a Proc to be run when no data is received from the server and a stall occurs.
-
#on_reconnect(&block) ⇒ Object
Set a Proc to be run on reconnect.
-
#on_scrub_geo(&block) ⇒ Object
Set a Proc to be run when a scrub_geo notice is received from the Twitter stream.
-
#on_stall_warning(&block) ⇒ Object
Set a Proc to be run when a stall warning is received.
-
#on_status_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
-
#on_timeline_status(&block) ⇒ Object
Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.
-
#on_unauthorized(&block) ⇒ Object
Set a Proc to be run when an HTTP status 401 is encountered while connecting to Twitter.
-
#on_user_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
- #respond_to(hash, callbacks, &block) ⇒ Object
-
#retweet(query_parameters = {}, &block) ⇒ Object
Returns all retweets.
-
#sample(query_parameters = {}, &block) ⇒ Object
Returns a random sample of all public statuses.
-
#sitestream(user_ids = [], query_params = {}, &block) ⇒ Object
Make a call to the userstream api.
-
#start(path, query_parameters = {}, &block) ⇒ Object
connect to twitter while starting a new EventMachine run loop.
-
#stop ⇒ Object
Terminate the currently running TweetStream and close EventMachine loop.
- #stop_stream ⇒ Object
-
#track(*keywords, &block) ⇒ Object
Specify keywords to track.
-
#userstream(query_params = {}, &block) ⇒ Object
Make a call to the userstream api for currently authenticated user.
Constructor Details
#initialize(options = {}) ⇒ Client
Creates a new API
55 56 57 58 59 60 61 62 |
# File 'lib/tweetstream/client.rb', line 55 def initialize(={}) self. = = TweetStream..merge() Configuration::VALID_OPTIONS_KEYS.each do |key| send("#{key}=", [key]) end @callbacks = {} end |
Instance Attribute Details
#control ⇒ Object (readonly)
Returns the value of attribute control.
52 53 54 |
# File 'lib/tweetstream/client.rb', line 52 def control @control end |
#control_uri ⇒ Object (readonly)
Returns the value of attribute control_uri.
52 53 54 |
# File 'lib/tweetstream/client.rb', line 52 def control_uri @control_uri end |
#options ⇒ Object
Returns the value of attribute options.
51 52 53 |
# File 'lib/tweetstream/client.rb', line 51 def @options end |
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
52 53 54 |
# File 'lib/tweetstream/client.rb', line 52 def stream @stream end |
Instance Method Details
#close_connection ⇒ Object
Close the connection to twitter without closing the eventmachine loop
498 499 500 |
# File 'lib/tweetstream/client.rb', line 498 def close_connection @stream.close_connection if @stream end |
#connect(path, options = {}, &block) ⇒ Object
connect to twitter without starting a new EventMachine run loop
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 |
# File 'lib/tweetstream/client.rb', line 406 def connect(path, = {}, &block) stream_parameters, callbacks = (path, ) @stream = EM::Twitter::Client.connect(stream_parameters) @stream.each do |item| begin hash = Yajl::Parser.parse(item, :symbolize_keys => true) rescue Yajl::ParseError invoke_callback(callbacks['error'], "Yajl::ParseError occured in stream: #{item}") next end unless hash.is_a?(::Hash) invoke_callback(callbacks['error'], "Unexpected JSON object in stream: #{item}") next end Twitter.identity_map = false respond_to(hash, callbacks, &block) (callbacks['anything'], hash) end @stream.on_error do || invoke_callback(callbacks['error'], ) end @stream. do invoke_callback(callbacks['unauthorized']) end @stream.on_enhance_your_calm do invoke_callback(callbacks['enhance_your_calm']) end @stream.on_reconnect do |timeout, retries| invoke_callback(callbacks['reconnect'], timeout, retries) end @stream.on_max_reconnects do |timeout, retries| raise TweetStream::ReconnectError.new(timeout, retries) end @stream.on_no_data_received do invoke_callback(callbacks['no_data_received']) end @stream end |
#filter(query_params = {}, &block) ⇒ Object
Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords. This method is provided separately for cases when it would conserve the number of HTTP connections to combine track and follow.
141 142 143 |
# File 'lib/tweetstream/client.rb', line 141 def filter(query_params = {}, &block) start('/1.1/statuses/filter.json', query_params.merge(:method => :post), &block) end |
#firehose(query_parameters = {}, &block) ⇒ Object
Returns all public statuses. The Firehose is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.
68 69 70 |
# File 'lib/tweetstream/client.rb', line 68 def firehose(query_parameters = {}, &block) start('/1.1/statuses/firehose.json', query_parameters, &block) end |
#follow(*user_ids, &block) ⇒ Object
Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies ("@user Hello!" created without pressing the reply "swoosh") are not matched. Requires integer user IDs, not screen names. Query parameters may be passed as the last argument.
116 117 118 119 120 |
# File 'lib/tweetstream/client.rb', line 116 def follow(*user_ids, &block) query_params = user_ids.pop if user_ids.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:follow => user_ids), &block) end |
#links(query_parameters = {}, &block) ⇒ Object
Returns all statuses containing http: and https:. The links stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.
76 77 78 |
# File 'lib/tweetstream/client.rb', line 76 def links(query_parameters = {}, &block) start('/1.1/statuses/links.json', query_parameters, &block) end |
#locations(*locations_map, &block) ⇒ Object
Specifies a set of bounding boxes to track. Only tweets that are both created using the Geotagging API and are placed from within a tracked bounding box will be included in the stream – the user’s location field is not used to filter tweets (e.g. if a user has their location set to “San Francisco”, but the tweet was not created using the Geotagging API and has no geo element, it will not be included in the stream). Bounding boxes are specified as a comma separate list of longitude/latitude pairs, with the first pair denoting the southwest corner of the box longitude/latitude pairs, separated by commas. The first pair specifies the southwest corner of the box.
130 131 132 133 134 |
# File 'lib/tweetstream/client.rb', line 130 def locations(*locations_map, &block) query_params = locations_map.pop if locations_map.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:locations => locations_map), &block) end |
#on(event, &block) ⇒ Object
382 383 384 385 386 387 388 389 |
# File 'lib/tweetstream/client.rb', line 382 def on(event, &block) if block_given? @callbacks[event.to_s] = block self else @callbacks[event.to_s] end end |
#on_anything(&block) ⇒ Object
Set a Proc to be run whenever anything is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client.on_anything do |status|
# do something with the status
end
Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
268 269 270 |
# File 'lib/tweetstream/client.rb', line 268 def on_anything(&block) on('anything', &block) end |
#on_delete(&block) ⇒ Object
Set a Proc to be run when a deletion notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_delete do |status_id, user_id|
Tweet.delete(status_id)
end
Block must take two arguments: the status id and the user id. If no block is given, it will return the currently set deletion proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
176 177 178 |
# File 'lib/tweetstream/client.rb', line 176 def on_delete(&block) on('delete', &block) end |
#on_direct_message(&block) ⇒ Object
Set a Proc to be run when a direct message is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client. do ||
# do something with the direct message
end
Block must take one argument: the direct message. If no block is given, it will return the currently set direct message proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
252 253 254 |
# File 'lib/tweetstream/client.rb', line 252 def (&block) on('direct_message', &block) end |
#on_enhance_your_calm(&block) ⇒ Object
Set a Proc to be run when enhance_your_calm signal is received.
@client = TweetStream::Client.new
@client.on_enhance_your_calm do
# do something, your account has been blocked
end
328 329 330 |
# File 'lib/tweetstream/client.rb', line 328 def on_enhance_your_calm(&block) on('enhance_your_calm', &block) end |
#on_error(&block) ⇒ Object
Set a Proc to be run when an HTTP error is encountered in the processing of the stream. Note that TweetStream will automatically try to reconnect, this is for reference only. Don't panic!
@client = TweetStream::Client.new
@client.on_error do ||
# Make note of error message
end
Block must take one argument: the error message. If no block is given, it will return the currently set error proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
225 226 227 |
# File 'lib/tweetstream/client.rb', line 225 def on_error(&block) on('error', &block) end |
#on_event(event, &block) ⇒ Object
Set a Proc to be run on userstream events
@client = TweetStream::Client.new
@client.event(:favorite) do |event|
# do something with the status
end
378 379 380 |
# File 'lib/tweetstream/client.rb', line 378 def on_event(event, &block) on(event, &block) end |
#on_friends(&block) ⇒ Object
Set a Proc to be run when a Site Stream friends list is received.
@client = TweetStream::Client.new
@client.on_friends do |friends|
# do something with the friends list
end
358 359 360 |
# File 'lib/tweetstream/client.rb', line 358 def on_friends(&block) on('friends', &block) end |
#on_inited(&block) ⇒ Object
Set a Proc to be run when connection established. Called in EventMachine::Connection#post_init
@client = TweetStream::Client.new
@client.on_inited do
puts 'Connected...'
end
307 308 309 |
# File 'lib/tweetstream/client.rb', line 307 def on_inited(&block) on('inited', &block) end |
#on_limit(&block) ⇒ Object
Set a Proc to be run when a rate limit notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_limit do |discarded_count|
# Make note of discarded count
end
Block must take one argument: the number of discarded tweets. If no block is given, it will return the currently set limit proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
208 209 210 |
# File 'lib/tweetstream/client.rb', line 208 def on_limit(&block) on('limit', &block) end |
#on_no_data_received(&block) ⇒ Object
Set a Proc to be run when no data is received from the server and a stall occurs. Twitter defines this to be 90 seconds.
@client = TweetStream::Client.new
@client.on_no_data_received do
# Make note of no data, possi
end
318 319 320 |
# File 'lib/tweetstream/client.rb', line 318 def on_no_data_received(&block) on('no_data_received', &block) end |
#on_reconnect(&block) ⇒ Object
Set a Proc to be run on reconnect.
@client = TweetStream::Client.new
@client.on_reconnect do |timeout, retries|
# Make note of the reconnection
end
295 296 297 |
# File 'lib/tweetstream/client.rb', line 295 def on_reconnect(&block) on('reconnect', &block) end |
#on_scrub_geo(&block) ⇒ Object
Set a Proc to be run when a scrub_geo notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_scrub_geo do |up_to_status_id, user_id|
Tweet.where(:status_id <= up_to_status_id)
end
Block must take two arguments: the upper status id and the user id. If no block is given, it will return the currently set scrub_geo proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
192 193 194 |
# File 'lib/tweetstream/client.rb', line 192 def on_scrub_geo(&block) on('scrub_geo', &block) end |
#on_stall_warning(&block) ⇒ Object
Set a Proc to be run when a stall warning is received.
@client = TweetStream::Client.new
@client.on_stall_warning do |warning|
# do something with the friends list
end
368 369 370 |
# File 'lib/tweetstream/client.rb', line 368 def on_stall_warning(&block) on('stall_warning', &block) end |
#on_status_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
@client = TweetStream::Client.new
@client.on_status_withheld do |status|
# do something with the status
end
338 339 340 |
# File 'lib/tweetstream/client.rb', line 338 def on_status_withheld(&block) on('status_withheld', &block) end |
#on_timeline_status(&block) ⇒ Object
Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client.on_timeline_status do |status|
# do something with the status
end
Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
284 285 286 |
# File 'lib/tweetstream/client.rb', line 284 def on_timeline_status(&block) on('timeline_status', &block) end |
#on_unauthorized(&block) ⇒ Object
Set a Proc to be run when an HTTP status 401 is encountered while connecting to Twitter. This could happen when system clock drift has occured.
If no block is given, it will return the currently set unauthorized proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
236 237 238 |
# File 'lib/tweetstream/client.rb', line 236 def (&block) on('unauthorized', &block) end |
#on_user_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
@client = TweetStream::Client.new
@client.on_user_withheld do |status|
# do something with the status
end
348 349 350 |
# File 'lib/tweetstream/client.rb', line 348 def on_user_withheld(&block) on('user_withheld', &block) end |
#respond_to(hash, callbacks, &block) ⇒ Object
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/tweetstream/client.rb', line 457 def respond_to(hash, callbacks, &block) if hash[:control] && hash[:control][:control_uri] @control_uri = hash[:control][:control_uri] require 'tweetstream/site_stream_client' @control = TweetStream::SiteStreamClient.new(@control_uri, ) @control.on_error(&callbacks['error']) elsif hash[:warning] invoke_callback(callbacks['stall_warning'], hash[:warning]) elsif hash[:delete] && hash[:delete][:status] invoke_callback(callbacks['delete'], hash[:delete][:status][:id], hash[:delete][:status][:user_id]) elsif hash[:scrub_geo] && hash[:scrub_geo][:up_to_status_id] invoke_callback(callbacks['scrub_geo'], hash[:scrub_geo][:up_to_status_id], hash[:scrub_geo][:user_id]) elsif hash[:limit] && hash[:limit][:track] invoke_callback(callbacks['limit'], hash[:limit][:track]) elsif hash[:direct_message] (callbacks['direct_message'], Twitter::DirectMessage.new(hash[:direct_message])) elsif hash[:status_withheld] invoke_callback(callbacks['status_withheld'], hash[:status_withheld]) elsif hash[:user_withheld] invoke_callback(callbacks['user_withheld'], hash[:user_withheld]) elsif hash[:event] invoke_callback(callbacks[hash[:event].to_s], hash) elsif hash[:friends] invoke_callback(callbacks['friends'], hash[:friends]) elsif hash[:text] && hash[:user] @last_status = Twitter::Tweet.new(hash) (callbacks['timeline_status'], @last_status) (block, @last_status) if block_given? elsif hash[:for_user] (block, hash) if block_given? end end |
#retweet(query_parameters = {}, &block) ⇒ Object
Returns all retweets. The retweet stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case. As of 9/11/2009, the site-wide retweet feature has not yet launched, so there are currently few, if any, retweets on this stream.
86 87 88 |
# File 'lib/tweetstream/client.rb', line 86 def retweet(query_parameters = {}, &block) start('/1.1/statuses/retweet.json', query_parameters, &block) end |
#sample(query_parameters = {}, &block) ⇒ Object
Returns a random sample of all public statuses. The default access level provides a small proportion of the Firehose. The "Gardenhose" access level provides a proportion more suitable for data mining and research applications that desire a larger proportion to be statistically significant sample.
95 96 97 |
# File 'lib/tweetstream/client.rb', line 95 def sample(query_parameters = {}, &block) start('/1.1/statuses/sample.json', query_parameters, &block) end |
#sitestream(user_ids = [], query_params = {}, &block) ⇒ Object
Make a call to the userstream api
153 154 155 156 157 158 159 160 161 162 |
# File 'lib/tweetstream/client.rb', line 153 def sitestream(user_ids = [], query_params = {}, &block) stream_params = { :host => "sitestream.twitter.com" } query_params.merge!({ :method => :post, :follow => user_ids, :extra_stream_parameters => stream_params }) query_params.merge!(:with => 'followings') if query_params.delete(:followings) start('/1.1/site.json', query_params, &block) end |
#start(path, query_parameters = {}, &block) ⇒ Object
connect to twitter while starting a new EventMachine run loop
392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/tweetstream/client.rb', line 392 def start(path, query_parameters = {}, &block) if EventMachine.reactor_running? connect(path, query_parameters, &block) else EventMachine.epoll EventMachine.kqueue EventMachine::run do connect(path, query_parameters, &block) end end end |
#stop ⇒ Object
Terminate the currently running TweetStream and close EventMachine loop
492 493 494 495 |
# File 'lib/tweetstream/client.rb', line 492 def stop EventMachine.stop_event_loop @last_status end |
#stop_stream ⇒ Object
502 503 504 |
# File 'lib/tweetstream/client.rb', line 502 def stop_stream @stream.stop if @stream end |
#track(*keywords, &block) ⇒ Object
Specify keywords to track. Queries are subject to Track Limitations, described in Track Limiting and subject to access roles, described in the statuses/filter method. Track keywords are case-insensitive logical ORs. Terms are exact-matched, and also exact-matched ignoring punctuation. Phrases, keywords with spaces, are not supported. Keywords containing punctuation will only exact match tokens. Query parameters may be passed as the last argument.
106 107 108 109 110 |
# File 'lib/tweetstream/client.rb', line 106 def track(*keywords, &block) query_params = keywords.pop if keywords.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:track => keywords), &block) end |
#userstream(query_params = {}, &block) ⇒ Object
Make a call to the userstream api for currently authenticated user
146 147 148 149 150 |
# File 'lib/tweetstream/client.rb', line 146 def userstream(query_params = {}, &block) stream_params = { :host => "userstream.twitter.com" } query_params.merge!(:extra_stream_parameters => stream_params) start('/1.1/user.json', query_params, &block) end |