Class: Datahen::Scraper::BatchParser

Inherits:
Object
  • Object
show all
Defined in:
lib/datahen/scraper/batch_parser.rb

Constant Summary collapse

NOT_FOUND_MSG =
"No more pages to parse found"
NO_DEQUEUE_COUNT_MSG =
"\nWarning: Max page to parse dequeue count is 0, check pages to parse scale\n"
NO_WORKERS_MSG =
"\nWarning: There are no parser workers\n"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_id, config_file, opts = {}) ⇒ BatchParser

Returns a new instance of BatchParser.

Options Hash (opts):

  • :worker_count (Integer) — default: 1

    Parallel worker quantity.

  • :max_garbage (Integer) — default: 5

    Max amount of times the garbage collector can be requested before actually executing.

  • :dequeue_interval (Integer) — default: 3

    Time in seconds to wait between page dequeuing.

  • :dequeue_scale (Numeric) — default: 2

    Scaling factor to used to calculate page dequeue size.

  • :dequeue_timeout (Numeric) — default: 30

    Page dequeue API request timeout in seconds.

  • :client_options (Hash) — default: {}

    Datahen client gem additional options (see Datahen::Client::Base#initialize method).



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/datahen/scraper/batch_parser.rb', line 105

def initialize(job_id, config_file, opts = {})
  opts = {
    worker_count: 1,
    max_garbage: 5,
    dequeue_interval: 3,
    dequeue_scale: 2,
    dequeue_timeout: 30,
    client_options: {}
  }.merge opts

  @job_id = job_id
  @worker_count = opts[:worker_count]
  @dequeue_interval = opts[:dequeue_interval]
  @dequeue_scale = opts[:dequeue_scale]
  @max_garbage = opts[:max_garbage]
  @pages = Concurrent::Array.new
  @loaded_pages = Concurrent::Hash.new
  @garbage_mutex = Mutex.new
  @dequeue_mutex = Mutex.new
  @not_found = false
  self.dequeue_timeout = opts[:dequeue_timeout]
  self.second_dequeue_count = 0
  self.garbage_count = 0
  self.config_file = config_file
  self.load_config

  @client = Datahen::Client::JobPage.new(opts[:client_options])
  nil
end

Instance Attribute Details

#clientDatahen::Client::JobPage (readonly)

Datahen job pages client used for API pages dequeuing.



60
61
62
# File 'lib/datahen/scraper/batch_parser.rb', line 60

def client
  @client
end

#configHash (readonly)

Current config file loaded.



57
58
59
# File 'lib/datahen/scraper/batch_parser.rb', line 57

def config
  @config
end

#config_fileString

Configuration file path.



13
14
15
# File 'lib/datahen/scraper/batch_parser.rb', line 13

def config_file
  @config_file
end

#dequeue_intervalInteger (readonly)

Dequeue interval in seconds.



45
46
47
# File 'lib/datahen/scraper/batch_parser.rb', line 45

def dequeue_interval
  @dequeue_interval
end

#dequeue_mutexMutex (readonly)

Dequeuer mutext used to synchronize page dequeuing.



69
70
71
# File 'lib/datahen/scraper/batch_parser.rb', line 69

def dequeue_mutex
  @dequeue_mutex
end

#dequeue_scaleNumeric (readonly)

Dequeue scale used to calculate the ideal dequeue size.



48
49
50
# File 'lib/datahen/scraper/batch_parser.rb', line 48

def dequeue_scale
  @dequeue_scale
end

#dequeue_timeoutInteger

Dequeue API request timeout in seconds.



25
26
27
# File 'lib/datahen/scraper/batch_parser.rb', line 25

def dequeue_timeout
  @dequeue_timeout
end

#dequeuer_still_aliveInteger (readonly)

Dequeuer last run unix timestamp.



72
73
74
# File 'lib/datahen/scraper/batch_parser.rb', line 72

def dequeuer_still_alive
  @dequeuer_still_alive
end

#dequeuer_threadThread (readonly)

Current dequeuer thread.



66
67
68
# File 'lib/datahen/scraper/batch_parser.rb', line 66

def dequeuer_thread
  @dequeuer_thread
end

#garbage_countInteger

Garbage collector request counter.



16
17
18
# File 'lib/datahen/scraper/batch_parser.rb', line 16

def garbage_count
  @garbage_count
end

#garbage_mutexMutex (readonly)

Garbage collector mutex used to synchronize garbage collector requests.



63
64
65
# File 'lib/datahen/scraper/batch_parser.rb', line 63

def garbage_mutex
  @garbage_mutex
end

#job_idInteger (readonly)

Job id to be executed.



28
29
30
# File 'lib/datahen/scraper/batch_parser.rb', line 28

def job_id
  @job_id
end

#last_messageString

Last printed message, useful to prevent duplicated log messages.



19
20
21
# File 'lib/datahen/scraper/batch_parser.rb', line 19

def last_message
  @last_message
end

#loaded_pagesConcurrent::Hash<String, Hash> (readonly)

Loaded pages hash, useful to avoid duplicates on the loaded pages array.



37
38
39
# File 'lib/datahen/scraper/batch_parser.rb', line 37

def loaded_pages
  @loaded_pages
end

#max_garbageInteger (readonly)

Max garbage collector requests before actually executing the garbage

collector.


42
43
44
# File 'lib/datahen/scraper/batch_parser.rb', line 42

def max_garbage
  @max_garbage
end

#not_foundBoolean (readonly)

Indicates whenever the wait time is because there are no more pages.



76
77
78
# File 'lib/datahen/scraper/batch_parser.rb', line 76

def not_found
  @not_found
end

#page_typesArray<String> (readonly)

Known page types extracted from the config file.



51
52
53
# File 'lib/datahen/scraper/batch_parser.rb', line 51

def page_types
  @page_types
end

#pagesConcurrent::Array<Hash> (readonly)

Loaded pages array.



34
35
36
# File 'lib/datahen/scraper/batch_parser.rb', line 34

def pages
  @pages
end

#parsersConcurrent::Hash<String, String> (readonly)

Known parsers extracted from the config file.



54
55
56
# File 'lib/datahen/scraper/batch_parser.rb', line 54

def parsers
  @parsers
end

#second_dequeue_countInteger

Second dequeue counter used to prevent false negative warning messages.



22
23
24
# File 'lib/datahen/scraper/batch_parser.rb', line 22

def second_dequeue_count
  @second_dequeue_count
end

#worker_countInteger (readonly)

Parallel worker quantity.



31
32
33
# File 'lib/datahen/scraper/batch_parser.rb', line 31

def worker_count
  @worker_count
end

Class Method Details

.timestampInteger

Get a unix timestamp.



86
87
88
# File 'lib/datahen/scraper/batch_parser.rb', line 86

def self.timestamp
  Time.new.utc.to_i
end

.wait(time_in_seconds) ⇒ Object

Wait a specific amount of seconds.



80
81
82
# File 'lib/datahen/scraper/batch_parser.rb', line 80

def self.wait time_in_seconds
  Kernel.sleep time_in_seconds
end

Instance Method Details

#dequeue_pagesHash

Dequeue one page from the previously loaded pages, and waits until there

are new pages whenever there are no loaded pages.


291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/datahen/scraper/batch_parser.rb', line 291

def dequeue_pages
  # collect garbage
  self.recollect_garbage

  # return page if there are loeaded pages
  is_waiting = false
  while true do
    page = self.pages.shift
    unless page.nil?
      puts "[Worker #{Parallel.worker_number}]: Finish waiting" if is_waiting
      loaded_pages.delete(page['gid'])
      return page
    end

    # be more verbose on worker waiting
    unless is_waiting
      is_waiting = true
      puts "[Worker #{Parallel.worker_number}]: Is waiting for a page..."
      if self.second_dequeue_count > 1 && !self.not_found
        puts "\nWARNING: Your job is not optimized, increase your job's \"parser_dequeue_scale\"\n"
      end
    end
    self.class.wait 1

    # ensure the dequeuer thread is alive and healthy
    self.ensure_dequeuer_thread
  end
end

#dequeuer_is_alive!Object

Refresh dequeuer’s still alive timestamp



183
184
185
186
187
188
# File 'lib/datahen/scraper/batch_parser.rb', line 183

def dequeuer_is_alive!
  self.dequeue_mutex.synchronize do
    @dequeuer_still_alive = self.class.timestamp
  end
  nil
end

#ensure_dequeuer_threadBoolean

Ensures that the dequeuer thread exists and is running.



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/datahen/scraper/batch_parser.rb', line 256

def ensure_dequeuer_thread
  self.dequeue_mutex.synchronize do
    # check if dequeuer thread is alive and healthy
    if !self.dequeuer_thread.nil? && self.dequeuer_thread.alive?
      still_alive_timeout = (self.dequeue_timeout + self.dequeue_interval) * 2 + self.dequeuer_still_alive
      return true if self.class.timestamp < still_alive_timeout

      # kill dequeuer thread
      self.repeat_puts "Dequeuer isn't healthy, will restart it..."
      self.dequeuer_thread.kill
      @dequeuer_thread = nil
      self.recollect_garbage
      self.no_repeat_puts "Dequeuer thread was killed!"
    end

    # dequeuing on parallel (the ride never ends :D)
    @dequeuer_thread = Thread.new do
      while true
        begin
          self.load_pages
          self.class.wait self.dequeue_interval
        rescue => e
          puts [e.message] + e.backtrace rescue 'error'
        end
      end
      puts "Error: dequeuer died! D:"
    end
    self.repeat_puts "Dequeuer thread was started!"
  end
  false
end

#exec_parse(save = false, keep_outputs = false) ⇒ Object

Dequeue pages and execute the parsers associated to them on parallel.



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/datahen/scraper/batch_parser.rb', line 321

def exec_parse save = false, keep_outputs = false
  if self.worker_count < 1
    self.no_repeat_puts NO_WORKERS_MSG
    return
  else
    self.no_repeat_puts "Spawing #{self.worker_count} workers"
  end

  # start dequeuer
  self.ensure_dequeuer_thread

  # process the pages
  dequeue = lambda{ self.dequeue_pages }
  Parallel.each(dequeue, in_threads: (worker_count)) do |page|
    parser_file = self.parsers[page['page_type']]
    begin
      self.repeat_puts("Parsing page with GID #{page['gid']}")
      puts Datahen::Scraper::Parser.exec_parser_by_page(
        parser_file,
        page,
        job_id,
        save,
        nil,
        keep_outputs
      )
      self.repeat_puts("Finish parsing page with GID #{page['gid']}")
    rescue Parallel::Kill => e
      puts "[Worker #{Parallel.worker_number}]: Someone tried to kill Parallel!!!"
    rescue Parallel::Break => e
      puts "[Worker #{Parallel.worker_number}]: Someone tried to break Parallel!!!"
    rescue => e
      puts [e.message] + e.backtrace rescue 'error'
    end
  end

  nil
end

#load_configObject

Loads the config file into a Hash.



150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/datahen/scraper/batch_parser.rb', line 150

def load_config
  # build page type to script file map
  @page_types = []
  @parsers = Concurrent::Hash.new
  @config = YAML.load_file(config_file)
  self.config['parsers'].each do |v|
    next if !v['disabled'].nil? && !!v['disabled']
    @page_types << v['page_type']
    self.parsers[v['page_type']] = v['file']
  end
  self.recollect_garbage
  nil
end

#load_pagesInteger

Load new pages by dequeuing from the API.



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/datahen/scraper/batch_parser.rb', line 192

def load_pages
  self.dequeuer_is_alive!

  # calculate dequeue size
  max_dequeue_size = (self.worker_count * self.dequeue_scale).ceil
  current_size = self.pages.length
  dequeue_size = (self.dequeue_scale * (max_dequeue_size - current_size)).ceil
  if dequeue_size < 1
    return 0
  end
  dequeue_size = max_dequeue_size if dequeue_size > max_dequeue_size

  # reserve and get to pages parse
  response = nil
  begin
    response = client.dequeue self.job_id,
      dequeue_size,
      self.page_types,
      config['parse_fetching_failed'],
      timeout: self.dequeue_timeout
  rescue Net::ReadTimeout, Net::OpenTimeout => e
    self.repeat_puts "Dequeue API call timeout! Contact infra team, your job needs a profile change"
    self.dequeuer_is_alive!
    return 0
  rescue => e
    raise e
  end
  self.dequeuer_is_alive!

  # ensure a valid response or try again
  if response.nil? || response.response.code.to_i != 200
    self.repeat_puts(response.nil? ? 'null' : response.body)
    self.recollect_garbage
    return 0
  end

  # add pages
  count = 0
  (JSON.parse(response.body) || []).each do |page|
    count += 1
    next if self.loaded_pages.has_key? page['gid']
    self.pages << (self.loaded_pages[page['gid']] = page)
  end
  response = nil
  self.dequeuer_is_alive!

  # recolect garbage to free some memory before parsing
  if count > 0
    @not_found = false
    self.recollect_garbage
    self.repeat_puts "Found #{count} page(s) to parse"
    self.second_dequeue_count += 1 unless self.second_dequeue_count > 1
  else
    @not_found = true
    self.no_repeat_puts NOT_FOUND_MSG
  end

  # return how many pages were loaded
  count
end

#no_repeat_puts(message) ⇒ Object

Print the message only when it is different from the last recorded

message.


175
176
177
178
179
180
# File 'lib/datahen/scraper/batch_parser.rb', line 175

def no_repeat_puts message
  return if message == self.last_message
  puts message
  self.last_message = message
  nil
end

#recollect_garbageObject

Execute garbage collector after it is requested as many times as

described by #max_garbage.


137
138
139
140
141
142
143
144
145
146
147
# File 'lib/datahen/scraper/batch_parser.rb', line 137

def recollect_garbage
  self.garbage_mutex.synchronize do
    self.garbage_count += 1
    if self.garbage_count > self.max_garbage
      puts "Recollect garbage"
      GC.start
      self.garbage_count = 0
    end
  end
  nil
end

#repeat_puts(message) ⇒ Object

Print the message regardless of it being the same as the last message.



166
167
168
169
170
# File 'lib/datahen/scraper/batch_parser.rb', line 166

def repeat_puts message
  puts message
  self.last_message = message
  nil
end