Class: Wayfarer::Processor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Observable, CrawlObserver::Events, CrawlObserver::ObservableShortcuts
Defined in:
lib/wayfarer/processor.rb

Overview

Runs jobs.

Constant Summary

Constants included from CrawlObserver::Events

CrawlObserver::Events::CycleFinished

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from CrawlObserver::ObservableShortcuts

#notify_observers!

Constructor Details

#initialize(job, frontier, dispatcher) ⇒ Processor

Returns a new instance of Processor.


21
22
23
24
25
26
# File 'lib/wayfarer/processor.rb', line 21

def initialize(job, frontier, dispatcher)
  @job = job
  @frontier = frontier
  @dispatcher = dispatcher
  @halted = Concurrent::AtomicBoolean.new(false)
end

Instance Attribute Details

#jobObject (readonly)

Returns the value of attribute job


16
17
18
# File 'lib/wayfarer/processor.rb', line 16

def job
  @job
end

Instance Method Details

#halt!Object

Sets a halt flag.


35
36
37
# File 'lib/wayfarer/processor.rb', line 35

def halt!
  @halted.make_true
end

#halted?true, false

Whether processing is done.

Returns:

  • (true, false)

30
31
32
# File 'lib/wayfarer/processor.rb', line 30

def halted?
  @halted.value
end

#run(*_uris) ⇒ Object

Runs the job.

Parameters:

  • uris (*Array<URI>, *Array<String>)

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/wayfarer/processor.rb', line 41

def run(*_uris)
  notify_observers!(FirstCycle.new(@frontier))

  while @halted.false? && @frontier.cycle
    current_uris = @frontier.current_uris
    queue = current_uris.inject(Queue.new, :push)

    notify_observers!(NewCycle.new(current_uris.count))

    @threads = Array.new(config.connection_count) do
      Thread.new do
        begin
          loop do
            uri = queue.pop(true)
            break if uri.nil? || @halted.true?
            handle_dispatch_result(@dispatcher.dispatch(@job, uri))
          end
        rescue ThreadError
          notify_observers!(CycleFinished.new)
        end
      end
    end

    @threads.each(&:join)

    notify_observers!(AboutToCycle.new(@frontier.staged_uris.count))
  end
ensure
  halt!
  @frontier.free
  @dispatcher.adapter_pool.free
end