Class: Embulk::Input::Service::ExportService
- Inherits:
-
BaseService
- Object
- BaseService
- Embulk::Input::Service::ExportService
- Defined in:
- lib/embulk/input/service/export_service.rb
Constant Summary collapse
- KNOWN_KEYS =
mixpanel.com/help/questions/articles/special-or-reserved-properties mixpanel.com/help/questions/articles/what-properties-do-mixpanels-libraries-store-by-default
JavaScript to extract key names from HTML: run it on Chrome Devtool when opening their document > Array.from(document.querySelectorAll(“strong”)).map(function(s){ return s.textContent.match(//) ? s.parentNode.textContent.match(/((.*?))/) : s.textContent.split(“,”).join(“ ”) }).join(“ ”) > Array.from(document.querySelectorAll(“li”)).map(function(s){ m = s.textContent.match(/((.*?))/); return m && m }).filter(function(k) { return k && !k.match(“utm”) }).join(“ ”)
%W( #{NOT_PROPERTY_COLUMN} distinct_id ip mp_name_tag mp_note token time mp_country_code length campaign_id $email $phone $distinct_id $ios_devices $android_devices $first_name $last_name $name $city $region $country_code $timezone $unsubscribed $city $region mp_country_code $browser $browser_version $device $current_url $initial_referrer $initial_referring_domain $os $referrer $referring_domain $screen_height $screen_width $search_engine $city $region $mp_country_code $timezone $browser_version $browser $initial_referrer $initial_referring_domain $os $last_seen $city $region mp_country_code $app_release $app_version $carrier $ios_ifa $os_version $manufacturer $lib_version $model $os $screen_height $screen_width $wifi $city $region $mp_country_code $timezone $ios_app_release $ios_app_version $ios_device_model $ios_lib_version $ios_version $ios_ifa $last_seen $city $region mp_country_code $app_version $bluetooth_enabled $bluetooth_version $brand $carrier $has_nfc $has_telephone $lib_version $manufacturer $model $os $os_version $screen_dpi $screen_height $screen_width $wifi $google_play_services $city $region mp_country_code $timezone $android_app_version $android_app_version_code $android_lib_version $android_os $android_os_version $android_brand $android_model $android_manufacturer $last_seen ).uniq.freeze
Constants inherited from BaseService
BaseService::DEFAULT_FETCH_DAYS, BaseService::DEFAULT_TIME_COLUMN, BaseService::NOT_PROPERTY_COLUMN
Instance Method Summary collapse
- #create_task ⇒ Object
- #create_task_report(current_latest_fetched_time, to_date, timezone) ⇒ Object
- #endpoint ⇒ Object
- #export_params ⇒ Object
- #fetch(dates, last_fetch_time, task, &block) ⇒ Object
- #guess_columns ⇒ Object
- #guess_from_records(records) ⇒ Object
- #guess_range ⇒ Object
- #ingest(task, page_builder) ⇒ Object
- #next_from_date(task_report) ⇒ Object
- #validate_config ⇒ Object
Methods inherited from BaseService
#default_guess_start_date, #initialize
Constructor Details
This class inherits a constructor from Embulk::Input::Service::BaseService
Instance Method Details
#create_task ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/embulk/input/service/export_service.rb', line 37 def create_task { params: export_params, dates: range, timezone: @config.param(:timezone, :string, default: ""), export_endpoint: endpoint, api_secret: @config.param(:api_secret, :string), schema: @config.param(:columns, :array), fetch_unknown_columns: @config.param(:fetch_unknown_columns, :bool, default: false), fetch_custom_properties: @config.param(:fetch_custom_properties, :bool, default: true), retry_initial_wait_sec: @config.param(:retry_initial_wait_sec, :integer, default: 1), incremental_column: @config.param(:incremental_column, :string, default: nil), retry_limit: @config.param(:retry_limit, :integer, default: 5), latest_fetched_time: @config.param(:latest_fetched_time, :integer, default: 0), incremental: @config.param(:incremental, :bool, default: true), slice_range: @config.param(:slice_range, :integer, default: 7), job_start_time: Time.now.to_i * 1000, incremental_column_upper_limit: incremental_column_upper_limit, allow_partial_import: @config.param(:allow_partial_import, :bool, default: true) } end |
#create_task_report(current_latest_fetched_time, to_date, timezone) ⇒ Object
146 147 148 149 150 151 |
# File 'lib/embulk/input/service/export_service.rb', line 146 def create_task_report(current_latest_fetched_time, to_date, timezone) { latest_fetched_time: current_latest_fetched_time, to_date: to_date || today(timezone) - 1, } end |
#endpoint ⇒ Object
233 234 235 |
# File 'lib/embulk/input/service/export_service.rb', line 233 def endpoint @config.param(:export_endpoint, :string, default: Embulk::Input::MixpanelApi::Client::DEFAULT_EXPORT_ENDPOINT) end |
#export_params ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/embulk/input/service/export_service.rb', line 178 def export_params event = @config.param(:event, :array, default: nil) event = event.nil? ? nil : event.to_json { event: event, where: @config.param(:where, :string, default: nil), bucket: @config.param(:bucket, :string, default: nil), } end |
#fetch(dates, last_fetch_time, task, &block) ⇒ Object
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/embulk/input/service/export_service.rb', line 205 def fetch(dates, last_fetch_time, task, &block) from_date = dates.first to_date = dates.last params = task[:params].merge( "from_date"=>from_date, "to_date"=>to_date ) incremental_column = task[:incremental_column] if !incremental_column.nil? # can't do filter on time column, time column need to be filter manually. params = params.merge( "where"=>"#{params['where'].nil? ? '' : "(#{params['where']}) and " }properties[\"#{incremental_column}\"] > #{last_fetch_time || 0} and properties[\"#{incremental_column}\"] < #{task[:incremental_column_upper_limit]}" ) end Embulk.logger.info "Where params is #{params["where"]}" client = create_client if preview? client.export_for_small_dataset(params) else Enumerator.new do |y| client.export(params) do |record| y << record end end end end |
#guess_columns ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/embulk/input/service/export_service.rb', line 153 def guess_columns giveup_when_mixpanel_is_down range = guess_range Embulk.logger.info "Guessing schema using #{range.first}..#{range.last} records" params = export_params.merge( "from_date"=>range.first, "to_date"=>range.last, ) client = create_client guess_from_records(client.export_for_small_dataset(params)) end |
#guess_from_records(records) ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/embulk/input/service/export_service.rb', line 188 def guess_from_records(records) sample_props = records.map {|r| r["properties"]} schema = Guess::SchemaGuess.from_hash_records(sample_props) columns = schema.map do |col| next if col.name == "time" result = { name: col.name, type: col.type, } result["format"] = col.format if col.format result end.compact columns.unshift(name: NOT_PROPERTY_COLUMN, type: :string) # Shift incremental column to top columns.unshift(name: "time", type: :long) end |
#guess_range ⇒ Object
167 168 169 170 171 172 173 174 175 176 |
# File 'lib/embulk/input/service/export_service.rb', line 167 def guess_range time_zone = @config.param(:timezone, :string, default: "") from_date = @config.param(:from_date, :string, default: default_guess_start_date(time_zone).to_s) fetch_days = @config.param(:fetch_days, :integer, default: DEFAULT_FETCH_DAYS) range = RangeGenerator.new(from_date, fetch_days, time_zone).generate_range if range.empty? return default_guess_start_date(time_zone)..(today(time_zone) - 1) end range end |
#ingest(task, page_builder) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/embulk/input/service/export_service.rb', line 67 def ingest(task, page_builder) giveup_when_mixpanel_is_down @schema = task[:schema] @timezone = task[:timezone] Embulk.logger.info "Job start time is #{task[:job_start_time]}" dates = task[:dates] prev_latest_fetched_time = task[:latest_fetched_time] || 0 prev_latest_fetched_time_format = Time.at(prev_latest_fetched_time).strftime("%F %T %z") current_latest_fetched_time = prev_latest_fetched_time incremental_column = task[:incremental_column] incremental = task[:incremental] fetch_unknown_columns = task[:fetch_unknown_columns] dates.each_slice(task[:slice_range]) do |slice_dates| ignored_fetched_record_count = 0 # There is the issue with Mixpanel time field during the transition from standard to daylight saving time # in the US timezone i.e. 11 Mar 2018 2AM - 2:59AM, time within that period must not be existed, # due to daylight saving, time will be forwarded 1 hour from 2AM to 3AM. # # All of records with wrong timezone will be ignored instead of throw exception out ignored_wrong_daylight_tz_record_count = 0 unless preview? Embulk.logger.info "Fetching data from #{slice_dates.first} to #{slice_dates.last} ..." end record_time_column = incremental_column || DEFAULT_TIME_COLUMN begin fetch(slice_dates, prev_latest_fetched_time, task).each do |record| if incremental if !record["properties"].include?(record_time_column) raise Embulk::ConfigError.new("Incremental column not exists in fetched data #{record_time_column}") end record_time = record["properties"][record_time_column] if incremental_column.nil? if record_time <= prev_latest_fetched_time ignored_fetched_record_count += 1 next end end current_latest_fetched_time = [ current_latest_fetched_time, record_time, ].max end begin values = extract_values(record) if fetch_unknown_columns unknown_values = extract_unknown_values(record) values << unknown_values.to_json end if task[:fetch_custom_properties] values << collect_custom_properties(record) end page_builder.add(values) rescue TZInfo::PeriodNotFound ignored_wrong_daylight_tz_record_count += 1 end end rescue MixpanelApi::IncompleteExportResponseError if !task[:allow_partial_import] # re raise the exception if we don't allow partial import raise end end if ignored_fetched_record_count > 0 Embulk.logger.warn "Skipped already loaded #{ignored_fetched_record_count} records. These record times are older or equal than previous fetched record time (#{prev_latest_fetched_time} @ #{prev_latest_fetched_time_format})." end if ignored_wrong_daylight_tz_record_count > 0 Embulk.logger.warn "Skipped #{ignored_wrong_daylight_tz_record_count} records due to corrupted Mixpanel time transition from standard to daylight saving" end break if preview? end page_builder.finish create_task_report(current_latest_fetched_time, dates.last, task[:timezone]) end |
#next_from_date(task_report) ⇒ Object
59 60 61 62 63 64 65 |
# File 'lib/embulk/input/service/export_service.rb', line 59 def next_from_date(task_report) next_to_date = Date.parse(task_report[:to_date]) { from_date: next_to_date.to_s, latest_fetched_time: task_report[:latest_fetched_time], } end |
#validate_config ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/embulk/input/service/export_service.rb', line 20 def validate_config super incremental_column = @config.param(:incremental_column, :string, default: nil) latest_fetched_time = @config.param(:latest_fetched_time, :integer, default: 0) fetch_custom_properties = @config.param(:fetch_custom_properties, :bool, default: true) fetch_unknown_columns = @config.param(:fetch_unknown_columns, :bool, default: false) if !incremental_column.nil? && !latest_fetched_time.nil? && (incremental_column_upper_limit <= latest_fetched_time) raise Embulk::ConfigError.new("Incremental column upper limit (job_start_time - incremental_column_upper_limit_delay_in_seconds) can't be smaller or equal latest fetched time #{latest_fetched_time}") end if fetch_unknown_columns && fetch_custom_properties raise Embulk::ConfigError.new("Don't set true both `fetch_unknown_columns` and `fetch_custom_properties`.") end end |