Class: Polipus::PolipusCrawler

Inherits:
Object
  • Object
show all
Defined in:
lib/polipus.rb

Constant Summary collapse

OPTS =
{
  # run 4 threads
  workers: 4,
  # identify self as Polipus/VERSION
  user_agent: "Polipus - #{Polipus::VERSION} - #{Polipus::HOMEPAGE}",
  # by default, don't limit the depth of the crawl
  depth_limit: false,
  # number of times HTTP redirects will be followed
  redirect_limit: 5,
  # storage engine defaults to DevNull
  storage: nil,
  # proxy server hostname
  proxy_host: nil,
  # proxy server port number
  proxy_port: false,
  # HTTP read timeout in seconds
  read_timeout: 30,
  # HTTP open connection timeout in seconds
  open_timeout: 10,
  # Time to wait for new messages on Redis
  # After this timeout, current crawling session is marked as terminated
  queue_timeout: 30,
  # An URL tracker instance. default is Bloomfilter based on redis
  url_tracker: nil,
  # A Redis options {} that will be passed directly to Redis.new
  redis_options: {},
  # An instance of logger
  logger: nil,
  # A logger level
  logger_level: nil,
  # whether the query string should be included in the saved page
  include_query_string_in_saved_page: true,
  # Max number of items to keep on redis
  queue_items_limit: 2_000_000,
  # The adapter used to store exceed (queue_items_limit) redis items
  queue_overflow_adapter: nil,
  # Every x seconds, the main queue is checked for overflowed items
  queue_overflow_manager_check_time: 60,
  # If true, each page downloaded will increment a counter on redis
  stats_enabled: false,
  # Cookies strategy
  cookie_jar: nil,
  # whether or not accept cookies
  accept_cookies: false,
  # A set of hosts that should be considered parts of the same domain
  # Eg It can be used to follow links with and without 'www' domain
  domain_aliases: [],
  # Mark a connection as staled after connection_max_hits request
  connection_max_hits: nil,
  # Page TTL: mark a page as expired after ttl_page seconds
  ttl_page: nil,
  # don't obey the robots exclusion protocol
  obey_robots_txt: false,
  # If true, signal handling strategy is enabled.
  # INT and TERM signal will stop polipus gracefully
  # Disable it if polipus will run as a part of Resque or DelayedJob-like system
  enable_signal_handler: true
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_name = 'polipus', urls = [], options = {}) {|_self| ... } ⇒ PolipusCrawler

Returns a new instance of PolipusCrawler.

Yields:

  • (_self)

Yield Parameters:



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/polipus.rb', line 97

def initialize(job_name = 'polipus', urls = [], options = {})
  @job_name     = job_name
  @options      = OPTS.merge(options)
  @options[:queue_timeout] = 1 if @options[:queue_timeout] <= 0
  @logger       = @options[:logger]  ||= Logger.new(nil)

  unless @logger.class.to_s == 'Log4r::Logger'
    @logger.level = @options[:logger_level] ||= Logger::INFO
  end

  @storage      = @options[:storage] ||= Storage.dev_null

  @http_pool    = []
  @workers_pool = []
  @queues_pool  = []

  @follow_links_like  = []
  @skip_links_like    = []
  @on_page_downloaded = []
  @on_before_save     = []
  @on_page_error      = []
  @focus_crawl_block  = nil
  @on_crawl_end       = []
  @redis_factory      = nil

  @overflow_manager = nil
  @crawler_name = `hostname`.strip + "-#{@job_name}"

  @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page]

  @urls = [urls].flatten.map { |url| URI(url) }
  @urls.each { |url| url.path = '/' if url.path.empty? }
  @robots = Polipus::Robotex.new(@options[:user_agent]) if @options[:obey_robots_txt]
  # Attach signal handling if enabled
  SignalHandler.enable if @options[:enable_signal_handler]
  execute_plugin 'on_initialize'

  yield self if block_given?
end

Instance Attribute Details

#crawler_nameObject (readonly)

Returns the value of attribute crawler_name.



86
87
88
# File 'lib/polipus.rb', line 86

def crawler_name
  @crawler_name
end

#job_nameObject (readonly)

Returns the value of attribute job_name.



83
84
85
# File 'lib/polipus.rb', line 83

def job_name
  @job_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



84
85
86
# File 'lib/polipus.rb', line 84

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



85
86
87
# File 'lib/polipus.rb', line 85

def options
  @options
end

#storageObject (readonly)

Returns the value of attribute storage.



82
83
84
# File 'lib/polipus.rb', line 82

def storage
  @storage
end

Class Method Details

.crawl(*args, &block) ⇒ Object



137
138
139
# File 'lib/polipus.rb', line 137

def self.crawl(*args, &block)
  new(*args, &block).takeover
end

Instance Method Details

#add_to_queue(page) ⇒ Object



320
321
322
323
324
325
326
# File 'lib/polipus.rb', line 320

def add_to_queue(page)
  if [:url, :referer, :depth].all? { |method| page.respond_to?(method) }
    add_url(page.url, referer: page.referer, depth: page.depth)
  else
    add_url(page)
  end
end

#add_url(url, params = {}) {|page| ... } ⇒ Object

Enqueue an url, no matter what

Yields:

  • (page)


329
330
331
332
333
# File 'lib/polipus.rb', line 329

def add_url(url, params = {})
  page = Page.new(url, params)
  yield(page) if block_given?
  internal_queue << page.to_json
end

#focus_crawl(&block) ⇒ Object

A block of code will be executed on every page downloaded. The code is used to extract urls to visit see links_for method



286
287
288
289
# File 'lib/polipus.rb', line 286

def focus_crawl(&block)
  @focus_crawl_block = block
  self
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it doesn’t match patterns



245
246
247
248
# File 'lib/polipus.rb', line 245

def follow_links_like(*patterns)
  @follow_links_like = @follow_links_like += patterns.uniq.compact
  self
end

#on_before_save(&block) ⇒ Object

A block of code will be executed on every page downloaded before being saved in the registered storage



272
273
274
275
# File 'lib/polipus.rb', line 272

def on_before_save(&block)
  @on_before_save << block
  self
end

#on_crawl_end(&block) ⇒ Object

A block of code will be executed when crawl session is over



265
266
267
268
# File 'lib/polipus.rb', line 265

def on_crawl_end(&block)
  @on_crawl_end << block
  self
end

#on_page_downloaded(&block) ⇒ Object

A block of code will be executed on every page downloaded The block takes the page as argument



259
260
261
262
# File 'lib/polipus.rb', line 259

def on_page_downloaded(&block)
  @on_page_downloaded << block
  self
end

#on_page_error(&block) ⇒ Object

A block of code will be executed whether a page contains an error



278
279
280
281
# File 'lib/polipus.rb', line 278

def on_page_error(&block)
  @on_page_error << block
  self
end

#queue_sizeObject



295
296
297
# File 'lib/polipus.rb', line 295

def queue_size
  internal_queue.size
end

#redisObject



316
317
318
# File 'lib/polipus.rb', line 316

def redis
  @redis ||= redis_factory_adapter
end

#redis_factory(&block) ⇒ Object



303
304
305
306
# File 'lib/polipus.rb', line 303

def redis_factory(&block)
  @redis_factory = block
  self
end

#redis_optionsObject



291
292
293
# File 'lib/polipus.rb', line 291

def redis_options
  @options[:redis_options]
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it matches a pattern



252
253
254
255
# File 'lib/polipus.rb', line 252

def skip_links_like(*patterns)
  @skip_links_like = @skip_links_like += patterns.uniq.compact
  self
end

#stats_reset!Object



299
300
301
# File 'lib/polipus.rb', line 299

def stats_reset!
  ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each { |e| redis.del e }
end

#stop!(cler_queue = false) ⇒ Object

Request to Polipus to stop its work (gracefully) cler_queue = true if you want to delete all of the pending urls to visit



337
338
339
340
# File 'lib/polipus.rb', line 337

def stop!(cler_queue = false)
  SignalHandler.terminate
  internal_queue.clear(true) if cler_queue
end

#takeoverObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/polipus.rb', line 141

def takeover
  overflow_items_controller if queue_overflow_adapter

  @urls.each do |u|
    add_url(u) { |page| page.user_data.p_seeded = true }
  end
  return if internal_queue.empty?

  execute_plugin 'on_crawl_start'
  @options[:workers].times do |worker_number|
    @workers_pool << Thread.new do
      @logger.debug { "Start worker #{worker_number}" }
      http  = @http_pool[worker_number]   ||= HTTP.new(@options)
      queue = @queues_pool[worker_number] ||= queue_factory
      queue.process(false, @options[:queue_timeout]) do |message|

        next if message.nil?

        execute_plugin 'on_message_received'

        page = Page.from_json message

        unless should_be_visited?(page.url, false)
          @logger.info { "[worker ##{worker_number}] Page (#{page.url}) is no more welcome." }
          queue.commit
          next
        end

        if page_exists? page
          @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." }
          queue.commit
          next
        end

        url = page.url.to_s
        @logger.debug { "[worker ##{worker_number}] Fetching page: [#{page.url}] Referer: #{page.referer} Depth: #{page.depth}" }

        execute_plugin 'on_before_download'

        pages = http.fetch_pages(url, page.referer, page.depth)
        if pages.count > 1
          rurls = pages.map { |e| e.url.to_s }.join(' --> ')
          @logger.info { "Got redirects! #{rurls}" }
          page = pages.pop
          page.aliases = pages.map { |e| e.url }
          if page_exists? page
            @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." }
            queue.commit
            next
          end
        else
          page = pages.last
        end

        execute_plugin 'on_after_download'

        if page.error
          @logger.warn { "Page #{page.url} has error: #{page.error}" }
          incr_error
          @on_page_error.each { |e| e.call(page) }
        end

        # Execute on_before_save blocks
        @on_before_save.each { |e| e.call(page) }

        page.storable? && @storage.add(page)

        @logger.debug { "[worker ##{worker_number}] Fetched page: [#{page.url}] Referrer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]" }
        @logger.info  { "[worker ##{worker_number}] Page (#{page.url}) downloaded" }

        incr_pages

        # Execute on_page_downloaded blocks
        @on_page_downloaded.each { |e| e.call(page) }

        if @options[:depth_limit] == false || @options[:depth_limit] > page.depth
          links_for(page).each do |url_to_visit|
            next unless should_be_visited?(url_to_visit)
            enqueue url_to_visit, page, queue
          end
        else
          @logger.info { "[worker ##{worker_number}] Depth limit reached #{page.depth}" }
        end

        @logger.debug { "[worker ##{worker_number}] Queue size: #{queue.size}" }
        @overflow_manager.perform if @overflow_manager && queue.empty?
        execute_plugin 'on_message_processed'

        if SignalHandler.terminated?
          @logger.info { 'About to exit! Thanks for using Polipus' }
          queue.commit
          break
        end
        true
      end
    end
  end
  @workers_pool.each { |w| w.join }
  @on_crawl_end.each { |e| e.call(self) }
  execute_plugin 'on_crawl_end'
end

#url_trackerObject



308
309
310
311
312
313
314
# File 'lib/polipus.rb', line 308

def url_tracker
  @url_tracker ||=
    @options[:url_tracker] ||=
      UrlTracker.bloomfilter(key_name: "polipus_bf_#{job_name}",
                             redis: redis_factory_adapter,
                             driver: 'lua')
end