Class: LogStash::Inputs::Twitter

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/twitter.rb

Overview

Read events from the twitter streaming api.

Instance Method Summary collapse

Instance Method Details

#registerObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/logstash/inputs/twitter.rb', line 56

def register
  require "twitter"

  # monkey patch twitter gem to ignore json parsing error.
  # at the same time, use our own json parser
  # this has been tested with a specific gem version, raise if not the same
  raise("Incompatible Twitter gem version and the LogStash::Json.load") unless Twitter::Version.to_s == "5.12.0"

  Twitter::Streaming::Response.module_eval do
    def on_body(data)
      @tokenizer.extract(data).each do |line|
        next if line.empty?
        begin
          @block.call(LogStash::Json.load(line, :symbolize_keys => true))
        rescue LogStash::Json::ParserError
          # silently ignore json parsing errors
        end
      end
    end
  end

  @client = Twitter::Streaming::Client.new do |c|
    c.consumer_key = @consumer_key
    c.consumer_secret = @consumer_secret.value
    c.access_token = @oauth_token
    c.access_token_secret = @oauth_token_secret.value
  end
end

#run(queue) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/logstash/inputs/twitter.rb', line 86

def run(queue)
  @logger.info("Starting twitter tracking", :keywords => @keywords)
  begin
    @client.filter(:track => @keywords.join(",")) do |tweet|
      return if stop?
      if tweet.is_a?(Twitter::Tweet)
        event = from_tweet(tweet)
        decorate(event)
        queue << event
      end
    end # client.filter
  rescue Twitter::Error::TooManyRequests => e
    @logger.warn("Twitter too many requests error, sleeping for #{e.rate_limit.reset_in}s")
    Stud.stoppable_sleep(e.rate_limit.reset_in) { stop? }
    retry
  rescue => e
    @logger.warn("Twitter client error", :message => e.message, :exception => e, :backtrace => e.backtrace)
    retry
  end
end