Class: TweetStream::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/tweetstream/client.rb

Overview

Provides simple access to the Twitter Streaming API (https://dev.twitter.com/docs/streaming-api) for Ruby scripts that need to create a long connection to Twitter for tracking and other purposes.

Basic usage of the library is to call one of the provided methods and provide a block that will perform actions on a yielded Twitter::Tweet. For example:

TweetStream::Client.new.track('fail') do |status|
  puts "[#{status.user.screen_name}] #{status.text}"
end

For information about a daemonized TweetStream client, view the TweetStream::Daemon class.

Direct Known Subclasses

Daemon

Constant Summary collapse

OPTION_CALLBACKS =
[:delete,
:scrub_geo,
:limit,
:error,
:enhance_your_calm,
:unauthorized,
:reconnect,
:inited,
:direct_message,
:timeline_status,
:anything,
:no_data_received,
:status_withheld,
:user_withheld].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Creates a new API



44
45
46
47
48
49
50
51
# File 'lib/tweetstream/client.rb', line 44

def initialize(options={})
  self.options = options
  merged_options = TweetStream.options.merge(options)
  Configuration::VALID_OPTIONS_KEYS.each do |key|
    send("#{key}=", merged_options[key])
  end
  @callbacks = {}
end

Instance Attribute Details

#controlObject (readonly)

Returns the value of attribute control.



41
42
43
# File 'lib/tweetstream/client.rb', line 41

def control
  @control
end

#control_uriObject (readonly)

Returns the value of attribute control_uri.



41
42
43
# File 'lib/tweetstream/client.rb', line 41

def control_uri
  @control_uri
end

#optionsObject

Returns the value of attribute options.



40
41
42
# File 'lib/tweetstream/client.rb', line 40

def options
  @options
end

#streamObject (readonly)

Returns the value of attribute stream.



41
42
43
# File 'lib/tweetstream/client.rb', line 41

def stream
  @stream
end

Instance Method Details

#close_connectionObject

Close the connection to twitter without closing the eventmachine loop



487
488
489
# File 'lib/tweetstream/client.rb', line 487

def close_connection
  @stream.close_connection if @stream
end

#connect(path, options = {}, &block) ⇒ Object

connect to twitter without starting a new EventMachine run loop



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/tweetstream/client.rb', line 395

def connect(path, options = {}, &block)
  stream_parameters, callbacks = connection_options(path, options)

  @stream = EM::Twitter::Client.connect(stream_parameters)
  @stream.each do |item|
    begin
      hash = Yajl::Parser.parse(item, :symbolize_keys => true)
    rescue Yajl::ParseError
      invoke_callback(callbacks['error'], "Yajl::ParseError occured in stream: #{item}")
      next
    end

    unless hash.is_a?(::Hash)
      invoke_callback(callbacks['error'], "Unexpected JSON object in stream: #{item}")
      next
    end

    Twitter.identity_map = false

    respond_to(hash, callbacks, &block)

    yield_message_to(callbacks['anything'], hash)
  end

  @stream.on_error do |message|
    invoke_callback(callbacks['error'], message)
  end

  @stream.on_unauthorized do
    invoke_callback(callbacks['unauthorized'])
  end

  @stream.on_enhance_your_calm do
    invoke_callback(callbacks['enhance_your_calm'])
  end

  @stream.on_reconnect do |timeout, retries|
    invoke_callback(callbacks['reconnect'], timeout, retries)
  end

  @stream.on_max_reconnects do |timeout, retries|
    raise TweetStream::ReconnectError.new(timeout, retries)
  end

  @stream.on_no_data_received do
    invoke_callback(callbacks['no_data_received'])
  end

  @stream
end

#filter(query_params = {}, &block) ⇒ Object

Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords. This method is provided separately for cases when it would conserve the number of HTTP connections to combine track and follow.



130
131
132
# File 'lib/tweetstream/client.rb', line 130

def filter(query_params = {}, &block)
  start('/1.1/statuses/filter.json', query_params.merge(:method => :post), &block)
end

#firehose(query_parameters = {}, &block) ⇒ Object

Returns all public statuses. The Firehose is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.



57
58
59
# File 'lib/tweetstream/client.rb', line 57

def firehose(query_parameters = {}, &block)
  start('/1.1/statuses/firehose.json', query_parameters, &block)
end

#follow(*user_ids, &block) ⇒ Object

Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies ("@user Hello!" created without pressing the reply "swoosh") are not matched. Requires integer user IDs, not screen names. Query parameters may be passed as the last argument.



105
106
107
108
109
# File 'lib/tweetstream/client.rb', line 105

def follow(*user_ids, &block)
  query_params = user_ids.pop if user_ids.last.is_a?(::Hash)
  query_params ||= {}
  filter(query_params.merge(:follow => user_ids), &block)
end

Returns all statuses containing http: and https:. The links stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.



65
66
67
# File 'lib/tweetstream/client.rb', line 65

def links(query_parameters = {}, &block)
  start('/1.1/statuses/links.json', query_parameters, &block)
end

#locations(*locations_map, &block) ⇒ Object

Specifies a set of bounding boxes to track. Only tweets that are both created using the Geotagging API and are placed from within a tracked bounding box will be included in the stream – the user’s location field is not used to filter tweets (e.g. if a user has their location set to “San Francisco”, but the tweet was not created using the Geotagging API and has no geo element, it will not be included in the stream). Bounding boxes are specified as a comma separate list of longitude/latitude pairs, with the first pair denoting the southwest corner of the box longitude/latitude pairs, separated by commas. The first pair specifies the southwest corner of the box.



119
120
121
122
123
# File 'lib/tweetstream/client.rb', line 119

def locations(*locations_map, &block)
  query_params = locations_map.pop if locations_map.last.is_a?(::Hash)
  query_params ||= {}
  filter(query_params.merge(:locations => locations_map), &block)
end

#on(event, &block) ⇒ Object



371
372
373
374
375
376
377
378
# File 'lib/tweetstream/client.rb', line 371

def on(event, &block)
  if block_given?
    @callbacks[event.to_s] = block
    self
  else
    @callbacks[event.to_s]
  end
end

#on_anything(&block) ⇒ Object

Set a Proc to be run whenever anything is encountered in the processing of the stream.

@client = TweetStream::Client.new
@client.on_anything do |status|
  # do something with the status
end

Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



257
258
259
# File 'lib/tweetstream/client.rb', line 257

def on_anything(&block)
  on('anything', &block)
end

#on_delete(&block) ⇒ Object

Set a Proc to be run when a deletion notice is received from the Twitter stream. For example:

@client = TweetStream::Client.new
@client.on_delete do |status_id, user_id|
  Tweet.delete(status_id)
end

Block must take two arguments: the status id and the user id. If no block is given, it will return the currently set deletion proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



165
166
167
# File 'lib/tweetstream/client.rb', line 165

def on_delete(&block)
  on('delete', &block)
end

#on_direct_message(&block) ⇒ Object

Set a Proc to be run when a direct message is encountered in the processing of the stream.

@client = TweetStream::Client.new
@client.on_direct_message do |direct_message|
  # do something with the direct message
end

Block must take one argument: the direct message. If no block is given, it will return the currently set direct message proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



241
242
243
# File 'lib/tweetstream/client.rb', line 241

def on_direct_message(&block)
  on('direct_message', &block)
end

#on_enhance_your_calm(&block) ⇒ Object

Set a Proc to be run when enhance_your_calm signal is received.

@client = TweetStream::Client.new
@client.on_enhance_your_calm do
  # do something, your account has been blocked
end


317
318
319
# File 'lib/tweetstream/client.rb', line 317

def on_enhance_your_calm(&block)
  on('enhance_your_calm', &block)
end

#on_error(&block) ⇒ Object

Set a Proc to be run when an HTTP error is encountered in the processing of the stream. Note that TweetStream will automatically try to reconnect, this is for reference only. Don't panic!

@client = TweetStream::Client.new
@client.on_error do |message|
  # Make note of error message
end

Block must take one argument: the error message. If no block is given, it will return the currently set error proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



214
215
216
# File 'lib/tweetstream/client.rb', line 214

def on_error(&block)
  on('error', &block)
end

#on_event(event, &block) ⇒ Object

Set a Proc to be run on userstream events

@client = TweetStream::Client.new
@client.event(:favorite) do |event|
  # do something with the status
end


367
368
369
# File 'lib/tweetstream/client.rb', line 367

def on_event(event, &block)
  on(event, &block)
end

#on_friends(&block) ⇒ Object

Set a Proc to be run when a Site Stream friends list is received.

@client = TweetStream::Client.new
@client.on_friends do |friends|
  # do something with the friends list
end


347
348
349
# File 'lib/tweetstream/client.rb', line 347

def on_friends(&block)
  on('friends', &block)
end

#on_inited(&block) ⇒ Object

Set a Proc to be run when connection established. Called in EventMachine::Connection#post_init

@client = TweetStream::Client.new
@client.on_inited do
  puts 'Connected...'
end


296
297
298
# File 'lib/tweetstream/client.rb', line 296

def on_inited(&block)
  on('inited', &block)
end

#on_limit(&block) ⇒ Object

Set a Proc to be run when a rate limit notice is received from the Twitter stream. For example:

@client = TweetStream::Client.new
@client.on_limit do |discarded_count|
  # Make note of discarded count
end

Block must take one argument: the number of discarded tweets. If no block is given, it will return the currently set limit proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



197
198
199
# File 'lib/tweetstream/client.rb', line 197

def on_limit(&block)
  on('limit', &block)
end

#on_no_data_received(&block) ⇒ Object

Set a Proc to be run when no data is received from the server and a stall occurs. Twitter defines this to be 90 seconds.

@client = TweetStream::Client.new
@client.on_no_data_received do
  # Make note of no data, possi
end


307
308
309
# File 'lib/tweetstream/client.rb', line 307

def on_no_data_received(&block)
  on('no_data_received', &block)
end

#on_reconnect(&block) ⇒ Object

Set a Proc to be run on reconnect.

@client = TweetStream::Client.new
@client.on_reconnect do |timeout, retries|
  # Make note of the reconnection
end


284
285
286
# File 'lib/tweetstream/client.rb', line 284

def on_reconnect(&block)
  on('reconnect', &block)
end

#on_scrub_geo(&block) ⇒ Object

Set a Proc to be run when a scrub_geo notice is received from the Twitter stream. For example:

@client = TweetStream::Client.new
@client.on_scrub_geo do |up_to_status_id, user_id|
  Tweet.where(:status_id <= up_to_status_id)
end

Block must take two arguments: the upper status id and the user id. If no block is given, it will return the currently set scrub_geo proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



181
182
183
# File 'lib/tweetstream/client.rb', line 181

def on_scrub_geo(&block)
  on('scrub_geo', &block)
end

#on_stall_warning(&block) ⇒ Object

Set a Proc to be run when a stall warning is received.

@client = TweetStream::Client.new
@client.on_stall_warning do |warning|
  # do something with the friends list
end


357
358
359
# File 'lib/tweetstream/client.rb', line 357

def on_stall_warning(&block)
  on('stall_warning', &block)
end

#on_status_withheld(&block) ⇒ Object

Set a Proc to be run when a status_withheld message is received.

@client = TweetStream::Client.new
@client.on_status_withheld do |status|
  # do something with the status
end


327
328
329
# File 'lib/tweetstream/client.rb', line 327

def on_status_withheld(&block)
  on('status_withheld', &block)
end

#on_timeline_status(&block) ⇒ Object

Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.

@client = TweetStream::Client.new
@client.on_timeline_status do |status|
  # do something with the status
end

Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



273
274
275
# File 'lib/tweetstream/client.rb', line 273

def on_timeline_status(&block)
  on('timeline_status', &block)
end

#on_unauthorized(&block) ⇒ Object

Set a Proc to be run when an HTTP status 401 is encountered while connecting to Twitter. This could happen when system clock drift has occured.

If no block is given, it will return the currently set unauthorized proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



225
226
227
# File 'lib/tweetstream/client.rb', line 225

def on_unauthorized(&block)
  on('unauthorized', &block)
end

#on_user_withheld(&block) ⇒ Object

Set a Proc to be run when a status_withheld message is received.

@client = TweetStream::Client.new
@client.on_user_withheld do |status|
  # do something with the status
end


337
338
339
# File 'lib/tweetstream/client.rb', line 337

def on_user_withheld(&block)
  on('user_withheld', &block)
end

#respond_to(hash, callbacks, &block) ⇒ Object



446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# File 'lib/tweetstream/client.rb', line 446

def respond_to(hash, callbacks, &block)
  if hash[:control] && hash[:control][:control_uri]
    @control_uri = hash[:control][:control_uri]
    require 'tweetstream/site_stream_client'
    @control = TweetStream::SiteStreamClient.new(@control_uri, options)
    @control.on_error(&callbacks['error'])
  elsif hash[:warning]
    invoke_callback(callbacks['stall_warning'], hash[:warning])
  elsif hash[:delete] && hash[:delete][:status]
    invoke_callback(callbacks['delete'], hash[:delete][:status][:id], hash[:delete][:status][:user_id])
  elsif hash[:scrub_geo] && hash[:scrub_geo][:up_to_status_id]
    invoke_callback(callbacks['scrub_geo'], hash[:scrub_geo][:up_to_status_id], hash[:scrub_geo][:user_id])
  elsif hash[:limit] && hash[:limit][:track]
    invoke_callback(callbacks['limit'], hash[:limit][:track])
  elsif hash[:direct_message]
    yield_message_to(callbacks['direct_message'], Twitter::DirectMessage.new(hash[:direct_message]))
  elsif hash[:status_withheld]
    invoke_callback(callbacks['status_withheld'], hash[:status_withheld])
  elsif hash[:user_withheld]
    invoke_callback(callbacks['user_withheld'], hash[:user_withheld])
  elsif hash[:event]
    invoke_callback(callbacks[hash[:event].to_s], hash)
  elsif hash[:friends]
    invoke_callback(callbacks['friends'], hash[:friends])
  elsif hash[:text] && hash[:user]
    @last_status = Twitter::Tweet.new(hash)
    yield_message_to(callbacks['timeline_status'], @last_status)

    yield_message_to(block, @last_status) if block_given?
  elsif hash[:for_user]
    yield_message_to(block, hash) if block_given?
  end
end

#retweet(query_parameters = {}, &block) ⇒ Object

Returns all retweets. The retweet stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case. As of 9/11/2009, the site-wide retweet feature has not yet launched, so there are currently few, if any, retweets on this stream.



75
76
77
# File 'lib/tweetstream/client.rb', line 75

def retweet(query_parameters = {}, &block)
  start('/1.1/statuses/retweet.json', query_parameters, &block)
end

#sample(query_parameters = {}, &block) ⇒ Object

Returns a random sample of all public statuses. The default access level provides a small proportion of the Firehose. The "Gardenhose" access level provides a proportion more suitable for data mining and research applications that desire a larger proportion to be statistically significant sample.



84
85
86
# File 'lib/tweetstream/client.rb', line 84

def sample(query_parameters = {}, &block)
  start('/1.1/statuses/sample.json', query_parameters, &block)
end

#sitestream(user_ids = [], query_params = {}, &block) ⇒ Object

Make a call to the userstream api



142
143
144
145
146
147
148
149
150
151
# File 'lib/tweetstream/client.rb', line 142

def sitestream(user_ids = [], query_params = {}, &block)
  stream_params = { :host => "sitestream.twitter.com" }
  query_params.merge!({
    :method => :post,
    :follow => user_ids,
    :extra_stream_parameters => stream_params
  })
  query_params.merge!(:with => 'followings') if query_params.delete(:followings)
  start('/1.1/site.json', query_params, &block)
end

#start(path, query_parameters = {}, &block) ⇒ Object

connect to twitter while starting a new EventMachine run loop



381
382
383
384
385
386
387
388
389
390
391
392
# File 'lib/tweetstream/client.rb', line 381

def start(path, query_parameters = {}, &block)
  if EventMachine.reactor_running?
    connect(path, query_parameters, &block)
  else
    EventMachine.epoll
    EventMachine.kqueue

    EventMachine::run do
      connect(path, query_parameters, &block)
    end
  end
end

#stopObject

Terminate the currently running TweetStream and close EventMachine loop



481
482
483
484
# File 'lib/tweetstream/client.rb', line 481

def stop
  EventMachine.stop_event_loop
  @last_status
end

#stop_streamObject



491
492
493
# File 'lib/tweetstream/client.rb', line 491

def stop_stream
  @stream.stop if @stream
end

#track(*keywords, &block) ⇒ Object

Specify keywords to track. Queries are subject to Track Limitations, described in Track Limiting and subject to access roles, described in the statuses/filter method. Track keywords are case-insensitive logical ORs. Terms are exact-matched, and also exact-matched ignoring punctuation. Phrases, keywords with spaces, are not supported. Keywords containing punctuation will only exact match tokens. Query parameters may be passed as the last argument.



95
96
97
98
99
# File 'lib/tweetstream/client.rb', line 95

def track(*keywords, &block)
  query_params = keywords.pop if keywords.last.is_a?(::Hash)
  query_params ||= {}
  filter(query_params.merge(:track => keywords), &block)
end

#userstream(query_params = {}, &block) ⇒ Object

Make a call to the userstream api for currently authenticated user



135
136
137
138
139
# File 'lib/tweetstream/client.rb', line 135

def userstream(query_params = {}, &block)
  stream_params = { :host => "userstream.twitter.com" }
  query_params.merge!(:extra_stream_parameters => stream_params)
  start('/1.1/user.json', query_params, &block)
end