Class: Wayfarer::Processor
- Inherits:
-
Object
- Object
- Wayfarer::Processor
- Extended by:
- Forwardable
- Includes:
- Observable, CrawlObserver::Events, CrawlObserver::ObservableShortcuts
- Defined in:
- lib/wayfarer/processor.rb
Overview
Runs jobs.
Constant Summary
Constants included from CrawlObserver::Events
CrawlObserver::Events::CycleFinished
Instance Attribute Summary collapse
-
#job ⇒ Object
readonly
Returns the value of attribute job.
Instance Method Summary collapse
-
#halt! ⇒ Object
Sets a halt flag.
-
#halted? ⇒ true, false
Whether processing is done.
-
#initialize(job, frontier, dispatcher) ⇒ Processor
constructor
A new instance of Processor.
-
#run(*_uris) ⇒ Object
Runs the job.
Methods included from CrawlObserver::ObservableShortcuts
Constructor Details
#initialize(job, frontier, dispatcher) ⇒ Processor
Returns a new instance of Processor.
21 22 23 24 25 26 |
# File 'lib/wayfarer/processor.rb', line 21 def initialize(job, frontier, dispatcher) @job = job @frontier = frontier @dispatcher = dispatcher @halted = Concurrent::AtomicBoolean.new(false) end |
Instance Attribute Details
#job ⇒ Object (readonly)
Returns the value of attribute job.
16 17 18 |
# File 'lib/wayfarer/processor.rb', line 16 def job @job end |
Instance Method Details
#halt! ⇒ Object
Sets a halt flag.
35 36 37 |
# File 'lib/wayfarer/processor.rb', line 35 def halt! @halted.make_true end |
#halted? ⇒ true, false
Whether processing is done.
30 31 32 |
# File 'lib/wayfarer/processor.rb', line 30 def halted? @halted.value end |
#run(*_uris) ⇒ Object
Runs the job.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/wayfarer/processor.rb', line 41 def run(*_uris) notify_observers!(FirstCycle.new(@frontier)) while @halted.false? && @frontier.cycle current_uris = @frontier.current_uris queue = current_uris.inject(Queue.new, :push) notify_observers!(NewCycle.new(current_uris.count)) @threads = Array.new(config.connection_count) do Thread.new do begin loop do uri = queue.pop(true) break if uri.nil? || @halted.true? handle_dispatch_result(@dispatcher.dispatch(@job, uri)) end rescue ThreadError notify_observers!(CycleFinished.new) end end end @threads.each(&:join) notify_observers!(AboutToCycle.new(@frontier.staged_uris.count)) end ensure halt! @frontier.free @dispatcher.adapter_pool.free end |