Class: WebArchive::ArchiveQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/webarchive.rb

Overview

Queue for sending URLs to a certain archiving web site The block given to constructor will be executed for each ‘<<’

Instance Method Summary collapse

Constructor Details

#initialize(name, interval) {|String| ... } ⇒ ArchiveQueue

Create a new instance of ArchiveQueue

Parameters:

  • name (String)

    name of the queue

  • interval (Float)

    length of the wait between requests

Yields:

  • (String)

    URI that the queue receives



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/webarchive.rb', line 61

def initialize(name, interval)
  super()
  @name = name
  @interval = interval
  @all_sent = false
  @in_process = Concurrent::AtomicFixnum.new(0)
  last_request_time = Time.now - interval
  @consumer = Thread.new do
    loop do
      req = self.deq        # deq blocks until non-empty
      @in_process.value += 1
      begin
        sleep time_until_next_req(last_request_time, Time.now)
        last_request_time = Time.now
        yield req.uri
      rescue StandardError => e
        if retry?(e) && req.max_retry.positive?
          buff = [].tap { |a| a << self.deq until self.empty? }
          @in_process.value += buff.size + 1
          Concurrent::ScheduledTask.execute(req.wait) do
            @in_process.value -= buff.size + 1
            self.enq Req.new(req.uri, req.wait * 2, req.max_retry - 1)
            buff.each { |x| self.enq x }
          end
        else
          WebArchive.warn_archive_fail(
            req.uri, name, ([e.inspect] + e.backtrace).join("\n")
          )
        end
      ensure
        @in_process.value -= 1
        break if @all_sent && self.remaining.zero?
      end
    end
  end
end

Instance Method Details

#done_sendingBoolean

mark as ‘sending done’ and wait for items to be processed

Returns:

  • (Boolean)


118
119
120
121
# File 'lib/webarchive.rb', line 118

def done_sending
  @all_sent = true
  @consumer.join if self.remaining.positive?
end

#remainingInteger

number of queued items (including those being processed)

Returns:

  • (Integer)


125
126
127
# File 'lib/webarchive.rb', line 125

def remaining
  self.size + @in_process.value
end

#retry?(exc) ⇒ Boolean

Parameters:

  • exc (Exception)

Returns:

  • (Boolean)


105
106
107
108
109
110
111
112
113
114
# File 'lib/webarchive.rb', line 105

def retry?(exc)
  [
    Errno::ECONNRESET,
    Errno::EHOSTUNREACH
  ].include?(exc.class) ||
    (exc.is_a?(OpenURI::HTTPError) && exc.message.start_with?('429 ')) ||
    (exc.is_a?(OpenURI::HTTPError) && exc.message.start_with?('502 ')) ||
    (exc.is_a?(OpenURI::HTTPError) && exc.message.start_with?('503 ')) ||
    (exc.is_a?(Mechanize::ResponseCodeError) && exc.response_code == '503')
end

#time_until_next_req(last_req, current) ⇒ Object



98
99
100
101
# File 'lib/webarchive.rb', line 98

def time_until_next_req(last_req, current)
  elapsed = [current - last_req, 0].max
  [@interval - elapsed, 0].max
end