Class: Embulk::InputTwitterStream

Inherits:
InputPlugin
  • Object
show all
Defined in:
lib/embulk/input_twitterstream.rb

Constant Summary collapse

COLUMN_NAMES =
[
  'text',
  'created_at',
  'id_str',
  'user.follow_request_sent',
  'user.following',
  'user.time_zone',
  'user.created_at',
  'user.profile_sidebar_fill_color',
  'user.profile_image_url',
  'user.default_profile',
  'user.geo_enabled',
  'user.profile_sidebar_border_color',
  'user.is_translator',
  'user.url',
  'user.profile_image_url_https',
  'user.description',
  'user.listed_count',
  'user.profile_use_background_image',
  'user.friends_count',
  'user.followers_count',
  'user.profile_text_color',
  'user.profile_background_image_url',
  'user.location',
  'user.profile_link_color',
  'user.protected',
  'user.default_profile_image',
  'user.lang',
  'user.statuses_count',
  'user.verified',
  'user.name',
  'user.id_str',
  'user.show_all_inline_media',
  'user.contributors_enabled',
  'user.notifications',
  'user.profile_background_image_url_https',
  'user.profile_background_color',
  'user.id',
  'user.profile_background_tile',
  'user.utc_offset',
  'user.favourites_count',
  'user.screen_name',
  'in_reply_to_user_id',
  'id',
  'contributors',
  'truncated'
]

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task, schema, index, page_builder) ⇒ InputTwitterStream

Returns a new instance of InputTwitterStream.



75
76
77
# File 'lib/embulk/input_twitterstream.rb', line 75

def initialize(task, schema, index, page_builder)
  super
end

Class Method Details

.transaction(config, &control) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/embulk/input_twitterstream.rb', line 58

def self.transaction(config, &control)
  task = config
  threads = 1
  cols = config.param('columns', :array)
  if cols.empty?
    columns = COLUMN_NAMES.map.with_index  {|column, index|
      Column.new(index, column, :string)
    }
  else
    columns = config.param('columns', :array).map.with_index { |column, index|
      Column.new(i, column, :string)
    }
  end
  commit_reports = yield(task, columns, threads)
  return {}
end

Instance Method Details

#dot_flatten(hash, path = '') ⇒ Object



79
80
81
82
83
84
85
86
87
88
# File 'lib/embulk/input_twitterstream.rb', line 79

def dot_flatten(hash, path = '')
  hash.each_with_object({}) do |(k, v), ret|
    key = path + k.to_s
    if v.is_a? Hash
      ret.merge! dot_flatten(v, key + '.')
    else
      ret[key] = v
    end
  end
end

#runObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/embulk/input_twitterstream.rb', line 90

def run
  client = Twitter::Streaming::Client.new(
    consumer_key:         @task['consumer_key'],
    consumer_secret:      @task['consumer_secret'],
    access_token:         @task['access_token'],
    access_token_secret:  @task['access_token_secret'],
  )
  count = @task['count'] ? @task['count'].to_i : 0
  begin
    client.user do |item|
      case item
      when Twitter::Tweet
        tweet = dot_flatten(item.to_hash)
        @page_builder.add(@schema.map {|column| tweet.has_key?(column.name) ? tweet[column.name].to_s : ''})
        if count > 0
          count -= 1
          if count == 0
            raise StopStreamException if count == 0
          end
        end
      end
    end
  rescue StopStreamException
  end
  @page_builder.finish
  commit_report = {}
  return commit_report
end