Class: Polipus::PolipusCrawler

Inherits:
Object
  • Object
show all
Defined in:
lib/polipus.rb

Constant Summary collapse

OPTS =
{
  # run 4 threads
  workers: 4,
  # identify self as Polipus/VERSION
  user_agent: "Polipus - #{Polipus::VERSION} - #{Polipus::HOMEPAGE}",
  # by default, don't limit the depth of the crawl
  depth_limit: false,
  # number of times HTTP redirects will be followed
  redirect_limit: 5,
  # storage engine defaults to DevNull
  storage: nil,
  # proxy server hostname
  proxy_host: nil,
  # proxy server port number
  proxy_port: false,
  # proxy server username
  proxy_user: nil,
  # proxy server password
  proxy_pass: nil,
  # HTTP read timeout in seconds
  read_timeout: 30,
  # HTTP open connection timeout in seconds
  open_timeout: 10,
  # Time to wait for new messages on Redis
  # After this timeout, current crawling session is marked as terminated
  queue_timeout: 30,
  # An URL tracker instance. default is Bloomfilter based on redis
  url_tracker: nil,
  # A Redis options {} that will be passed directly to Redis.new
  redis_options: {},
  # An instance of logger
  logger: nil,
  # A logger level
  logger_level: nil,
  # whether the query string should be included in the saved page
  include_query_string_in_saved_page: true,
  # Max number of items to keep on redis
  queue_items_limit: 2_000_000,
  # The adapter used to store exceed (queue_items_limit) redis items
  queue_overflow_adapter: nil,
  # Every x seconds, the main queue is checked for overflowed items
  queue_overflow_manager_check_time: 60,
  # If true, each page downloaded will increment a counter on redis
  stats_enabled: false,
  # Cookies strategy
  cookie_jar: nil,
  # whether or not accept cookies
  accept_cookies: false,
  # A set of hosts that should be considered parts of the same domain
  # Eg It can be used to follow links with and without 'www' domain
  domain_aliases: [],
  # Mark a connection as staled after connection_max_hits request
  connection_max_hits: nil,
  # Page TTL: mark a page as expired after ttl_page seconds
  ttl_page: nil,
  # don't obey the robots exclusion protocol
  obey_robots_txt: false,
  # If true, signal handling strategy is enabled.
  # INT and TERM signal will stop polipus gracefully
  # Disable it if polipus will run as a part of Resque or DelayedJob-like system
  enable_signal_handler: true
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_name = 'polipus', urls = [], options = {}) {|_self| ... } ⇒ PolipusCrawler

Returns a new instance of PolipusCrawler.

Yields:

  • (_self)

Yield Parameters:



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/polipus.rb', line 101

def initialize(job_name = 'polipus', urls = [], options = {})
  @job_name     = job_name
  @options      = OPTS.merge(options)
  @options[:queue_timeout] = 1 if @options[:queue_timeout] <= 0
  @logger       = @options[:logger]  ||= Logger.new(nil)

  unless @logger.class.to_s == 'Log4r::Logger'
    @logger.level = @options[:logger_level] ||= Logger::INFO
  end

  @storage      = @options[:storage] ||= Storage.dev_null

  @workers_pool = []

  @follow_links_like  = []
  @skip_links_like    = []
  @on_page_downloaded = []
  @on_before_save     = []
  @on_page_error      = []
  @focus_crawl_block  = nil
  @on_crawl_start     = []
  @on_crawl_end       = []
  @redis_factory      = nil

  @overflow_manager = nil
  @crawler_name = `hostname`.strip + "-#{@job_name}"

  @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page]

  @urls = [urls].flatten.map { |url| URI(url) }
  @urls.each { |url| url.path = '/' if url.path.empty? }
  if @options[:obey_robots_txt]
    @robots =
      if @options[:user_agent].respond_to?(:sample)
        Polipus::Robotex.new(@options[:user_agent].sample)
      else
        Polipus::Robotex.new(@options[:user_agent])
      end
  end
  # Attach signal handling if enabled
  SignalHandler.enable if @options[:enable_signal_handler]

  if queue_overflow_adapter
    @on_crawl_start << lambda do |_|
      Thread.new do
        Thread.current[:name] = :overflow_items_controller
        overflow_items_controller.run
      end
    end
  end

  @on_crawl_end << lambda do |_|
    Thread.list.select { |thread| thread.status && Thread.current[:name] == :overflow_items_controller }.each(&:kill)
  end

  execute_plugin 'on_initialize'

  yield self if block_given?
end

Instance Attribute Details

#crawler_nameObject (readonly)

Returns the value of attribute crawler_name.



90
91
92
# File 'lib/polipus.rb', line 90

def crawler_name
  @crawler_name
end

#job_nameObject (readonly)

Returns the value of attribute job_name.



87
88
89
# File 'lib/polipus.rb', line 87

def job_name
  @job_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



88
89
90
# File 'lib/polipus.rb', line 88

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



89
90
91
# File 'lib/polipus.rb', line 89

def options
  @options
end

#storageObject (readonly)

Returns the value of attribute storage.



86
87
88
# File 'lib/polipus.rb', line 86

def storage
  @storage
end

Class Method Details

.crawl(*args, &block) ⇒ Object



161
162
163
# File 'lib/polipus.rb', line 161

def self.crawl(*args, &block)
  new(*args, &block).takeover
end

Instance Method Details

#add_to_queue(page) ⇒ Object



350
351
352
353
354
355
356
# File 'lib/polipus.rb', line 350

def add_to_queue(page)
  if [:url, :referer, :depth].all? { |method| page.respond_to?(method) }
    add_url(page.url, referer: page.referer, depth: page.depth)
  else
    add_url(page)
  end
end

#add_url(url, params = {}) {|page| ... } ⇒ Object

Enqueue an url, no matter what

Yields:

  • (page)


359
360
361
362
363
# File 'lib/polipus.rb', line 359

def add_url(url, params = {})
  page = Page.new(url, params)
  yield(page) if block_given?
  internal_queue << page.to_json
end

#focus_crawl(&block) ⇒ Object

A block of code will be executed on every page downloaded. The code is used to extract urls to visit see links_for method



316
317
318
319
# File 'lib/polipus.rb', line 316

def focus_crawl(&block)
  @focus_crawl_block = block
  self
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it doesn’t match patterns



269
270
271
272
# File 'lib/polipus.rb', line 269

def follow_links_like(*patterns)
  @follow_links_like = @follow_links_like += patterns.uniq.compact
  self
end

#on_before_save(&block) ⇒ Object

A block of code will be executed on every page downloaded before being saved in the registered storage



302
303
304
305
# File 'lib/polipus.rb', line 302

def on_before_save(&block)
  @on_before_save << block
  self
end

#on_crawl_end(&block) ⇒ Object

A block of code will be executed when crawl session is over



289
290
291
292
# File 'lib/polipus.rb', line 289

def on_crawl_end(&block)
  @on_crawl_end << block
  self
end

#on_crawl_start(&block) ⇒ Object

A block of code will be executed when crawl session is starting



295
296
297
298
# File 'lib/polipus.rb', line 295

def on_crawl_start(&block)
  @on_crawl_start << block
  self
end

#on_page_downloaded(&block) ⇒ Object

A block of code will be executed on every page downloaded The block takes the page as argument



283
284
285
286
# File 'lib/polipus.rb', line 283

def on_page_downloaded(&block)
  @on_page_downloaded << block
  self
end

#on_page_error(&block) ⇒ Object

A block of code will be executed whether a page contains an error



308
309
310
311
# File 'lib/polipus.rb', line 308

def on_page_error(&block)
  @on_page_error << block
  self
end

#queue_sizeObject



325
326
327
# File 'lib/polipus.rb', line 325

def queue_size
  internal_queue.size
end

#redisObject



346
347
348
# File 'lib/polipus.rb', line 346

def redis
  @redis ||= redis_factory_adapter
end

#redis_factory(&block) ⇒ Object



333
334
335
336
# File 'lib/polipus.rb', line 333

def redis_factory(&block)
  @redis_factory = block
  self
end

#redis_optionsObject



321
322
323
# File 'lib/polipus.rb', line 321

def redis_options
  @options[:redis_options]
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it matches a pattern



276
277
278
279
# File 'lib/polipus.rb', line 276

def skip_links_like(*patterns)
  @skip_links_like = @skip_links_like += patterns.uniq.compact
  self
end

#stats_reset!Object



329
330
331
# File 'lib/polipus.rb', line 329

def stats_reset!
  ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each { |e| redis.del e }
end

#stop!(cler_queue = false) ⇒ Object

Request to Polipus to stop its work (gracefully) cler_queue = true if you want to delete all of the pending urls to visit



367
368
369
370
# File 'lib/polipus.rb', line 367

def stop!(cler_queue = false)
  SignalHandler.terminate
  internal_queue.clear(true) if cler_queue
end

#takeoverObject



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/polipus.rb', line 165

def takeover
  @urls.each do |u|
    add_url(u) { |page| page.user_data.p_seeded = true }
  end
  return if internal_queue.empty?

  @on_crawl_start.each { |e| e.call(self) }

  execute_plugin 'on_crawl_start'
  @options[:workers].times do |worker_number|
    @workers_pool << Thread.new do
      @logger.debug { "Start worker #{worker_number}" }
      http  =  HTTP.new(@options)
      queue =  queue_factory
      queue.process(false, @options[:queue_timeout]) do |message|
        next if message.nil?

        execute_plugin 'on_message_received'

        page = Page.from_json message

        unless should_be_visited?(page.url, false)
          @logger.info { "[worker ##{worker_number}] Page (#{page.url}) is no more welcome." }
          queue.commit
          next
        end

        if page_exists? page
          @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." }
          queue.commit
          next
        end

        url = page.url.to_s
        @logger.debug { "[worker ##{worker_number}] Fetching page: [#{page.url}] Referer: #{page.referer} Depth: #{page.depth}" }

        execute_plugin 'on_before_download'

        pages = http.fetch_pages(url, page.referer, page.depth, page.user_data)
        if pages.count > 1
          rurls = pages.map { |e| e.url.to_s }.join(' --> ')
          @logger.info { "Got redirects! #{rurls}" }
          page = pages.pop
          page.aliases = pages.map(&:url)
          if page_exists? page
            @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." }
            queue.commit
            next
          end
        else
          page = pages.last
        end

        execute_plugin 'on_after_download'

        if page.error
          @logger.warn { "Page #{page.url} has error: #{page.error}" }
          incr_error
          @on_page_error.each { |e| e.call(page) }
        end

        # Execute on_before_save blocks
        @on_before_save.each { |e| e.call(page) }

        page.storable? && @storage.add(page)

        @logger.debug { "[worker ##{worker_number}] Fetched page: [#{page.url}] Referrer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]" }
        @logger.info  { "[worker ##{worker_number}] Page (#{page.url}) downloaded" }

        incr_pages

        # Execute on_page_downloaded blocks
        @on_page_downloaded.each { |e| e.call(page) }

        if @options[:depth_limit] == false || @options[:depth_limit] > page.depth
          links_for(page).each do |url_to_visit|
            next unless should_be_visited?(url_to_visit)
            enqueue url_to_visit, page
          end
        else
          @logger.info { "[worker ##{worker_number}] Depth limit reached #{page.depth}" }
        end

        @logger.debug { "[worker ##{worker_number}] Queue size: #{queue.size}" }
        @overflow_manager.perform if @overflow_manager && queue.empty?
        execute_plugin 'on_message_processed'

        if SignalHandler.terminated?
          @logger.info { 'About to exit! Thanks for using Polipus' }
          queue.commit
          break
        end
        true
      end
    end
  end

  @workers_pool.each(&:join)
  @on_crawl_end.each { |e| e.call(self) }
  execute_plugin 'on_crawl_end'
end

#url_trackerObject



338
339
340
341
342
343
344
# File 'lib/polipus.rb', line 338

def url_tracker
  @url_tracker ||=
    @options[:url_tracker] ||=
      UrlTracker.bloomfilter(key_name: "polipus_bf_#{job_name}",
                             redis: redis_factory_adapter,
                             driver: 'lua')
end