Class: LogStash::Inputs::Twitter
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Twitter
- Defined in:
- lib/logstash/inputs/twitter.rb
Overview
Read events from the twitter streaming api.
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/logstash/inputs/twitter.rb', line 56 def register require "twitter" # monkey patch twitter gem to ignore json parsing error. # at the same time, use our own json parser # this has been tested with a specific gem version, raise if not the same raise("Incompatible Twitter gem version and the LogStash::Json.load") unless Twitter::Version.to_s == "5.12.0" Twitter::Streaming::Response.module_eval do def on_body(data) @tokenizer.extract(data).each do |line| next if line.empty? begin @block.call(LogStash::Json.load(line, :symbolize_keys => true)) rescue LogStash::Json::ParserError # silently ignore json parsing errors end end end end @client = Twitter::Streaming::Client.new do |c| c.consumer_key = @consumer_key c.consumer_secret = @consumer_secret.value c.access_token = @oauth_token c.access_token_secret = @oauth_token_secret.value end end |
#run(queue) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/logstash/inputs/twitter.rb', line 86 def run(queue) @logger.info("Starting twitter tracking", :keywords => @keywords) begin @client.filter(:track => @keywords.join(",")) do |tweet| return if stop? if tweet.is_a?(Twitter::Tweet) event = from_tweet(tweet) decorate(event) queue << event end end # client.filter rescue Twitter::Error::TooManyRequests => e @logger.warn("Twitter too many requests error, sleeping for #{e.rate_limit.reset_in}s") Stud.stoppable_sleep(e.rate_limit.reset_in) { stop? } retry rescue => e @logger.warn("Twitter client error", :message => e., :exception => e, :backtrace => e.backtrace) retry end end |