Class: OnlineGHAProvider

Inherits:
GHAProvider show all
Defined in:
lib/gh-archive.rb

Defined Under Namespace

Classes: Cache, DownloadArchiveException

Instance Method Summary collapse

Methods inherited from GHAProvider

#exclude, #include, #logger=, #parse_events, #restore_checkpoint, #update_checkpoint, #use_checkpoint

Methods included from GHAUtils

#each_time, #get_gha_filename, #read_gha_file, #read_gha_file_content

Constructor Details

#initialize(max_retries = 3, proactive = false, proactive_pool_size = 10) ⇒ OnlineGHAProvider

Returns a new instance of OnlineGHAProvider.



254
255
256
257
258
259
260
261
# File 'lib/gh-archive.rb', line 254

def initialize(max_retries = 3, proactive = false, proactive_pool_size = 10)
    super()
    
    self.max_retries(max_retries)
    self.proactive(proactive_pool_size) if proactive
    
    @cache = Cache.new
end

Instance Method Details

#cache(current_time) ⇒ Object



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/gh-archive.rb', line 311

def cache(current_time)
    @logger.info("Full cache. Waiting for some free slot...") if @cache.full?
    while @cache.full?
        sleep 1
    end
    @max_retries.times do
        begin
            filename = self.get_gha_filename(current_time)
            URI.open("http://data.gharchive.org/#{filename}") do |gz|
                content = self.read_gha_file(gz)
                @cache.put(filename, content)
                return
            end
        rescue Errno::ECONNRESET => e
            @logger.warn("A server error temporary prevented the download of #{current_time}: " + e.message)
            next
        rescue OpenURI::HTTPError => e
            code = e.io.status[0]
            if code.start_with?("5")
                @logger.warn("A server error temporary prevented the download of #{current_time}: " + e.message)
                next
            else
                raise e
            end
        end
    end
end

#each(from = Time.gm(2015, 1, 1), to = Time.now) ⇒ Object



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/gh-archive.rb', line 339

def each(from = Time.gm(2015, 1, 1), to = Time.now)
    if @proactive
        real_from = restore_checkpoint(from)
        any_ready = Thread.promise
        
        @logger.info("Proactively scheduling download tasks...")
        self.each_time(real_from, to) do |current_time|
            @pool.process(current_time) do |current_time|
                cache(current_time)
                any_ready << true
                @logger.info("Proactively cached #{current_time}. Cache size: #{@cache.size}")
            end
        end
        
        ~any_ready
        @logger.info("Download tasks successfully scheduled!")
    end
    
    super
end

#get(current_time) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/gh-archive.rb', line 276

def get(current_time)        
    @max_retries.times do
        begin
            filename = self.get_gha_filename(current_time)
            
            if @proactive
                @logger.info("Waiting for cache to have #{current_time}...") unless @cache.has?(filename)
                
                while !@cache.has?(filename)
                    sleep 1
                end

                return @cache.get(filename)
            else
                URI.open("http://data.gharchive.org/#{filename}") do |gz|
                    return self.read_gha_file(gz)
                end
            end
        rescue Errno::ECONNRESET => e
            @logger.warn("A server error temporary prevented the download of #{current_time}: " + e.message)
            next
        rescue OpenURI::HTTPError => e
            code = e.io.status[0]
            if code.start_with?("5")
                @logger.warn("A server error temporary prevented the download of #{current_time}: " + e.message)
                next
            else
                raise e
            end
        end
    end
    
    raise DownloadArchiveException, "Exceeded maximum number of tentative downloads for #{current_time}."
end

#max_retries(n) ⇒ Object



263
264
265
266
267
# File 'lib/gh-archive.rb', line 263

def max_retries(n)
    @max_retries = n
    
    return self
end

#proactive(pool_size = 10) ⇒ Object



269
270
271
272
273
274
# File 'lib/gh-archive.rb', line 269

def proactive(pool_size = 10)
    @proactive = true
    @pool = GHArchive::ThreadPool.new(pool_size)
    
    return self
end