Class: WebArchive::ArchiveQueue
- Inherits:
-
Queue
- Object
- Queue
- WebArchive::ArchiveQueue
- Defined in:
- lib/webarchive.rb
Overview
Queue for sending URLs to a certain archiving web site The block given to constructor will be executed for each ‘<<’
Instance Method Summary collapse
-
#done_sending ⇒ Boolean
mark as ‘sending done’ and wait for items to be processed.
-
#initialize(name, interval) {|String| ... } ⇒ ArchiveQueue
constructor
Create a new instance of ArchiveQueue.
-
#remaining ⇒ Integer
number of queued items (including those being processed).
- #retry?(exc) ⇒ Boolean
- #time_until_next_req(last_req, current) ⇒ Object
Constructor Details
#initialize(name, interval) {|String| ... } ⇒ ArchiveQueue
Create a new instance of ArchiveQueue
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/webarchive.rb', line 61 def initialize(name, interval) super() @name = name @interval = interval @all_sent = false @in_process = Concurrent::AtomicFixnum.new(0) last_request_time = Time.now - interval @consumer = Thread.new do loop do req = self.deq # deq blocks until non-empty @in_process.value += 1 begin sleep time_until_next_req(last_request_time, Time.now) last_request_time = Time.now yield req.uri rescue StandardError => e if retry?(e) && req.max_retry.positive? buff = [].tap { |a| a << self.deq until self.empty? } @in_process.value += buff.size + 1 Concurrent::ScheduledTask.execute(req.wait) do @in_process.value -= buff.size + 1 self.enq Req.new(req.uri, req.wait * 2, req.max_retry - 1) buff.each { |x| self.enq x } end else WebArchive.warn_archive_fail( req.uri, name, ([e.inspect] + e.backtrace).join("\n") ) end ensure @in_process.value -= 1 break if @all_sent && self.remaining.zero? end end end end |
Instance Method Details
#done_sending ⇒ Boolean
mark as ‘sending done’ and wait for items to be processed
118 119 120 121 |
# File 'lib/webarchive.rb', line 118 def done_sending @all_sent = true @consumer.join if self.remaining.positive? end |
#remaining ⇒ Integer
number of queued items (including those being processed)
125 126 127 |
# File 'lib/webarchive.rb', line 125 def remaining self.size + @in_process.value end |
#retry?(exc) ⇒ Boolean
105 106 107 108 109 110 111 112 113 114 |
# File 'lib/webarchive.rb', line 105 def retry?(exc) [ Errno::ECONNRESET, Errno::EHOSTUNREACH ].include?(exc.class) || (exc.is_a?(OpenURI::HTTPError) && exc..start_with?('429 ')) || (exc.is_a?(OpenURI::HTTPError) && exc..start_with?('502 ')) || (exc.is_a?(OpenURI::HTTPError) && exc..start_with?('503 ')) || (exc.is_a?(Mechanize::ResponseCodeError) && exc.response_code == '503') end |
#time_until_next_req(last_req, current) ⇒ Object
98 99 100 101 |
# File 'lib/webarchive.rb', line 98 def time_until_next_req(last_req, current) elapsed = [current - last_req, 0].max [@interval - elapsed, 0].max end |