Class: Datahen::Scraper::Executor Abstract

Inherits:
Object
  • Object
show all
Includes:
Plugin::ContextExposer
Defined in:
lib/datahen/scraper/executor.rb

Overview

This class is abstract.

Constant Summary collapse

MAX_FIND_OUTPUTS_PER_PAGE =

Max allowed page size when query outputs (see #find_outputs).

500

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Plugin::ContextExposer

#create_context, #expose_to, #exposed_env, #exposed_methods, exposed_methods, #isolated_binding, #var_or_proc

Instance Attribute Details

#filenameObject

Returns the value of attribute filename.



9
10
11
# File 'lib/datahen/scraper/executor.rb', line 9

def filename
  @filename
end

#gidObject

Returns the value of attribute gid.



9
10
11
# File 'lib/datahen/scraper/executor.rb', line 9

def gid
  @gid
end

#job_idObject

Returns the value of attribute job_id.



9
10
11
# File 'lib/datahen/scraper/executor.rb', line 9

def job_id
  @job_id
end

#pageObject

Returns the value of attribute page.



9
10
11
# File 'lib/datahen/scraper/executor.rb', line 9

def page
  @page
end

Instance Method Details

#clean_backtrace(backtrace) ⇒ Object



337
338
339
340
341
342
343
344
# File 'lib/datahen/scraper/executor.rb', line 337

def clean_backtrace(backtrace)
  i = backtrace.index{|x| x =~ /gems\/datahen/i}
  if i.to_i < 1
    return []
  else
    return backtrace[0..(i-1)]
  end
end

#eval_with_context(file_path, context) ⇒ Object

Note:

Using this method will allow scripts to contain ‘return` to exit the script sooner along some improved security.

Eval a filename with a custom binding

Parameters:

  • file_path (String)

    File path to read.

  • context (Binding)

    Context binding to evaluate with.



377
378
379
# File 'lib/datahen/scraper/executor.rb', line 377

def eval_with_context file_path, context
  eval(File.read(file_path), context, file_path)
end

#exec_parser(save = false) ⇒ Object



13
14
15
# File 'lib/datahen/scraper/executor.rb', line 13

def exec_parser(save=false)
  raise "should be implemented in subclass"
end

#find_output(collection = 'default', query = {}, opts = {}) ⇒ Hash|nil

Note:

*opts ‘:job_id` option is prioritize over `:scraper_name` when both exists. If none add provided or nil values, then current job will be used to query instead, this is the defaul behavior.

Find one output by collection and query with pagination.

Examples:

find_output
find_output 'my_collection'
find_output 'my_collection', {}

Find from another scraper by name

find_output 'my_collection', {}, scraper_name: 'my_scraper'

Find from another scraper by job_id

find_output 'my_collection', {}, job_id: 123

Parameters:

  • collection (String) (defaults to: 'default')

    (‘default’) Collection name.

  • query (Hash) (defaults to: {})

    ({}) Filters to query.

  • opts (Hash) (defaults to: {})

    ({}) Configuration options.

Options Hash (opts):

  • :scraper_name (String|nil) — default: nil

    Scraper name to query from.

  • :job_id (Integer|nil) — default: nil

    Job’s id to query from.

Returns:

  • (Hash|nil)

    ‘Hash` when found, and `nil` when no output is found.

Raises:

  • (ArgumentError)

    collection is not String.

  • (ArgumentError)

    query is not a Hash.



204
205
206
207
# File 'lib/datahen/scraper/executor.rb', line 204

def find_output(collection='default', query={}, opts = {})
  result = find_outputs(collection, query, 1, 1, opts)
  result.respond_to?(:first) ? result.first : nil
end

#find_outputs(collection = 'default', query = {}, page = 1, per_page = 100, opts = {}) ⇒ Array

Note:

*opts ‘:job_id` option is prioritize over `:scraper_name` when both exists. If none add provided or nil values, then current job will be used to query instead, this is the defaul behavior.

Find outputs by collection and query with pagination.

Examples:

find_outputs
find_outputs 'my_collection'
find_outputs 'my_collection', {}
find_outputs 'my_collection', {}, 1
find_outputs 'my_collection', {}, 1, 100

Find from another scraper by name

find_outputs 'my_collection', {}, 1, 100, scraper_name: 'my_scraper'

Find from another scraper by job_id

find_outputs 'my_collection', {}, 1, 100, job_id: 123

Parameters:

  • collection (String) (defaults to: 'default')

    (‘default’) Collection name.

  • query (Hash) (defaults to: {})

    ({}) Filters to query.

  • page (Integer) (defaults to: 1)

    (1) Page number.

  • per_page (Integer) (defaults to: 100)

    (100) Page size.

  • opts (Hash) (defaults to: {})

    ({}) Configuration options.

Options Hash (opts):

  • :scraper_name (String|nil) — default: nil

    Scraper name to query from.

  • :job_id (Integer|nil) — default: nil

    Job’s id to query from.

Returns:

  • (Array)

Raises:

  • (ArgumentError)

    collection is not String.

  • (ArgumentError)

    query is not a Hash.

  • (ArgumentError)

    page is not an Integer greater than 0.

  • (ArgumentError)

    per_page is not an Integer between 1 and 500.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/datahen/scraper/executor.rb', line 148

def find_outputs(collection='default', query={}, page=1, per_page=100, opts = {})
  # Validate parameters out from nil for easier user usage.
  raise ArgumentError.new("collection needs to be a String") unless collection.is_a?(String)
  raise ArgumentError.new("query needs to be a Hash, instead of: #{query}") unless query.is_a?(Hash)
  unless page.is_a?(Integer) && page > 0
    raise ArgumentError.new("page needs to be an Integer greater than 0")
  end
  unless per_page.is_a?(Integer) && per_page > 0 && per_page <= MAX_FIND_OUTPUTS_PER_PAGE
    raise ArgumentError.new("per_page needs to be an Integer between 1 and #{MAX_FIND_OUTPUTS_PER_PAGE}")
  end

  options = {
    query: query,
    page: page,
    per_page: per_page}

  # Get job_id
  query_job_id = opts[:job_id] || get_job_id(opts[:scraper_name], self.job_id)

  client = Client::JobOutput.new(options)
  response = client.all(query_job_id, collection)

  if response.code != 200
    raise "response_code: #{response.code}|#{response.parsed_response}"
  end
  (response.body != 'null') ? response.parsed_response : []
end

#finishObject

Finish the executor execution



382
383
384
# File 'lib/datahen/scraper/executor.rb', line 382

def finish
  raise Error::SafeTerminateError
end

#finisher_update(options = {}) ⇒ Object



57
58
59
60
61
62
# File 'lib/datahen/scraper/executor.rb', line 57

def finisher_update(options={})
  client = Client::Job.new()
  job_id = options.fetch(:job_id)

  client.finisher_update(job_id, options)
end

#get_content(job_id, gid) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/datahen/scraper/executor.rb', line 74

def get_content(job_id, gid)
  client = Client::JobPage.new()
  content_json = client.find_content(job_id, gid)

  if content_json['available']
    signed_url = content_json['signed_url']
    Client::BackblazeContent.new.get_gunzipped_content(signed_url)
  else
    nil
  end
end

#get_failed_content(job_id, gid) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/datahen/scraper/executor.rb', line 86

def get_failed_content(job_id, gid)
  client = Client::JobPage.new()
  content_json = client.find_failed_content(job_id, gid)

  if content_json['available']
    signed_url = content_json['signed_url']
    Client::BackblazeContent.new.get_gunzipped_content(signed_url)
  else
    nil
  end
end

#get_job_id(scraper_name, default = nil) ⇒ Object

Get current job id from scraper or default when scraper_name is null.

Parameters:

  • scraper_name (String|nil)

    Scraper name.

  • default (Integer|nil) (defaults to: nil)

    (nil) Default job id when no scraper name.

Raises:

  • (Exception)

    When scraper name is not null, and scraper doesn’t exists or it has no current job.



105
106
107
108
109
110
# File 'lib/datahen/scraper/executor.rb', line 105

def get_job_id scraper_name, default = nil
  return default if scraper_name.nil?
  job = Client::ScraperJob.new().find(scraper_name)
  raise JSON.pretty_generate(job) if job['id'].nil?
  job['id']
end

#init_global_pageObject



64
65
66
67
68
69
70
71
72
# File 'lib/datahen/scraper/executor.rb', line 64

def init_global_page()
  client = Client::GlobalPage.new()
  global_page = client.find(gid)
  unless global_page.code == 200
    raise "GID #{gid} not found. Aborting execution!"
  else
    global_page
  end
end

#init_job_pageObject



31
32
33
34
35
36
37
38
39
40
# File 'lib/datahen/scraper/executor.rb', line 31

def init_job_page()
  client = Client::JobPage.new()
  job_page = client.find(job_id, gid)
  unless job_page.code == 200
    raise "Job #{job_id} or GID #{gid} not found. Aborting execution!"
  else
    job_page
  end

end

#init_pageObject



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/datahen/scraper/executor.rb', line 17

def init_page()
  # skip whenever a page is provided
  return self.page unless self.page.nil?

  if job_id
    puts "getting Job Page"
    init_job_page
  else
    puts "getting Global Page"
    init_global_page()
  end

end

#parsing_update(options = {}) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/datahen/scraper/executor.rb', line 42

def parsing_update(options={})
  client = Client::JobPage.new()
  job_id = options.fetch(:job_id)
  gid = options.fetch(:gid)

  client.parsing_update(job_id, gid, options)
end

#remove_old_dups!(list, key_defaults) ⇒ Integer

Remove dups by prioritizing the latest dup.

Parameters:

  • list (Array)

    List of hashes to dedup.

  • key_defaults (Hash)

    Key and default value pair hash to use on uniq validation.

Returns:

  • (Integer)

    Removed duplicated items count.



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/datahen/scraper/executor.rb', line 216

def remove_old_dups!(list, key_defaults)
  raw_count = list.count
  keys = key_defaults.keys
  force_uniq = 0
  list.reverse!.uniq! do |item|
    # Extract stringify keys as hash
    key_hash = Hash[item.map{|k,v|keys.include?(k.to_s) ? [k.to_s,v] : nil}.select{|i|!i.nil?}]

    # Apply defaults for uniq validation
    key_defaults.each{|k,v| key_hash[k] = v if key_hash[k].nil?}

    # Don't dedup nil key defaults
    skip_dedup = !keys.find{|k| key_hash[k].nil?}.nil?
    skip_dedup ? (force_uniq += 1) : key_hash
  end
  list.reverse!
  dup_count = raw_count - list.count
  dup_count
end

#remove_old_output_dups!(list) ⇒ Integer

Remove dups by prioritizing the latest dup.

Parameters:

  • list (Array)

    List of outputs to dedup.

Returns:

  • (Integer)

    Removed duplicated items count.



256
257
258
259
260
261
262
# File 'lib/datahen/scraper/executor.rb', line 256

def remove_old_output_dups!(list)
  key_defaults = {
    '_id' => nil,
    '_collection' => 'default'
  }
  remove_old_dups! list, key_defaults
end

#remove_old_page_dups!(list) ⇒ Integer

Note:

It will not dedup for now as it is hard to build gid. TODO: Build gid so we can dedup

Remove page dups by prioritizing the latest dup.

Parameters:

  • list (Array)

    List of pages to dedup.

Returns:

  • (Integer)

    Removed duplicated items count.



244
245
246
247
248
249
# File 'lib/datahen/scraper/executor.rb', line 244

def remove_old_page_dups!(list)
  key_defaults = {
    'gid' => nil
  }
  remove_old_dups! list, key_defaults
end

#save_outputs(outputs = []) ⇒ Object

Note:

IMPORTANT: outputs array’s elements will be removed.

Saves outputs from an array and clear it.

Parameters:

  • outputs (Array) (defaults to: [])

    ([]) Output array to save. Warning: all elements will be removed from the array.



366
367
368
# File 'lib/datahen/scraper/executor.rb', line 366

def save_outputs(outputs=[])
  save_pages_and_outputs([], outputs, save_type)
end

#save_pages(pages = []) ⇒ Object

Note:

IMPORTANT: pages array’s elements will be removed.

Saves pages from an array and clear it.

Parameters:

  • pages (Array) (defaults to: [])

    ([]) Page array to save. Warning: all elements will be removed from the array.



356
357
358
# File 'lib/datahen/scraper/executor.rb', line 356

def save_pages(pages=[])
  save_pages_and_outputs(pages, [], save_type)
end

#save_pages_and_outputs(pages = [], outputs = [], status) ⇒ Object



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/datahen/scraper/executor.rb', line 264

def save_pages_and_outputs(pages = [], outputs = [], status)
  total_pages = pages.count
  total_outputs = outputs.count
  records_per_slice = 100
  until pages.empty? && outputs.empty?
    pages_slice = pages.shift(records_per_slice)
    pages_dup_count = remove_old_page_dups! pages_slice
    outputs_slice = outputs.shift(records_per_slice)
    outputs_dup_count = remove_old_output_dups! outputs_slice

    log_msgs = []
    unless pages_slice.empty?
      page_dups_ignored = pages_dup_count > 0 ? " (#{pages_dup_count} dups ignored)" : ''
      log_msgs << "#{pages_slice.count} out of #{total_pages} Pages#{page_dups_ignored}"

      unless save
        puts '----------------------------------------'
        puts "Trying to validate #{log_msgs.last}#{page_dups_ignored}"
        puts JSON.pretty_generate pages_slice
      end
    end

    unless outputs_slice.empty?
      output_dups_ignored = outputs_dup_count > 0 ? " (#{outputs_dup_count} dups ignored)" : ''
      log_msgs << "#{outputs_slice.count} out of #{total_outputs} Outputs#{output_dups_ignored}"

      unless save
        puts '----------------------------------------'
        puts "Trying to validate #{log_msgs.last}#{output_dups_ignored}"
        puts JSON.pretty_generate outputs_slice
      end
    end

    # behave differently if it is a real save
    save_status = status
    if save
      log_msg = "Saving #{log_msgs.join(' and ')}."
      puts "#{log_msg}"
    else
      save_status = "#{status}_try"
    end

    # saving to server
    response = update_to_server(
      job_id: job_id,
      gid: gid,
      pages: pages_slice,
      outputs: outputs_slice,
      status: save_status)

    if response.code == 200
      if save
        log_msg = "Saved."
        puts "#{log_msg}"
      else
        puts "Validation successful"
      end
    else
      if save
        puts "Error: Unable to save Pages and/or Outputs to server: #{response.body}"
        raise "Unable to save Pages and/or Outputs to server: #{response.body}"
      else
        puts "Error: Invalid Pages and/or Outputs: #{response.body}"
        raise "Invalid Pages and/or Outputs: #{response.body}"
      end
    end
  end
end

#save_typeObject

Raises:

  • (NotImplementedError)


346
347
348
# File 'lib/datahen/scraper/executor.rb', line 346

def save_type
  raise NotImplementedError.new('Need to implement "save_type" method.')
end

#seeding_update(options = {}) ⇒ Object



50
51
52
53
54
55
# File 'lib/datahen/scraper/executor.rb', line 50

def seeding_update(options={})
  client = Client::Job.new()
  job_id = options.fetch(:job_id)

  client.seeding_update(job_id, options)
end

#update_to_server(opts = {}) ⇒ Object



333
334
335
# File 'lib/datahen/scraper/executor.rb', line 333

def update_to_server(opts = {})
  raise "Implemented in Subclass"
end