Class: LogStash::Inputs::Twitter

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/twitter.rb

Overview

Read events from the twitter streaming api.

Instance Method Summary collapse

Instance Method Details

#registerObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/logstash/inputs/twitter.rb', line 55

def register
  require "twitter"

  # monkey patch twitter gem to ignore json parsing error.
  # at the same time, use our own json parser
  # this has been tested with a specific gem version, raise if not the same
  raise("Incompatible Twitter gem version and the LogStash::Json.load") unless Twitter::Version.to_s == "5.12.0"

  Twitter::Streaming::Response.module_eval do
    def on_body(data)
      @tokenizer.extract(data).each do |line|
        next if line.empty?
        begin
          @block.call(LogStash::Json.load(line, :symbolize_keys => true))
        rescue LogStash::Json::ParserError
          # silently ignore json parsing errors
        end
      end
    end
  end

  @client = Twitter::Streaming::Client.new do |c|
    c.consumer_key = @consumer_key
    c.consumer_secret = @consumer_secret.value
    c.access_token = @oauth_token
    c.access_token_secret = @oauth_token_secret.value
  end
end

#run(queue) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/logstash/inputs/twitter.rb', line 85

def run(queue)
  @logger.info("Starting twitter tracking", :keywords => @keywords)
  begin
    @client.filter(:track => @keywords.join(",")) do |tweet|
      if tweet.is_a?(Twitter::Tweet)
        @logger.debug? && @logger.debug("Got tweet", :user => tweet.user.screen_name, :text => tweet.text)
        if @full_tweet
          event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash))
          event.timestamp = LogStash::Timestamp.new(tweet.created_at)
        else
          event = LogStash::Event.new(
            LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at),
            "message" => tweet.full_text,
            "user" => tweet.user.screen_name,
            "client" => tweet.source,
            "retweeted" => tweet.retweeted?,
            "source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}"
          )
          event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply?
          unless tweet.urls.empty?
            event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s)
          end
        end

        decorate(event)
        queue << event
      end
    end # client.filter
  rescue LogStash::ShutdownSignal
    return
  rescue Twitter::Error::TooManyRequests => e
    @logger.warn("Twitter too many requests error, sleeping for #{e.rate_limit.reset_in}s")
    sleep(e.rate_limit.reset_in)
    retry
  rescue => e
    @logger.warn("Twitter client error", :message => e.message, :exception => e, :backtrace => e.backtrace)
    retry
  end
end