Class: Polipus::PolipusCrawler

Inherits:
Object
  • Object
show all
Defined in:
lib/polipus.rb

Constant Summary collapse

OPTS =
{
  # run 4 threads
  :workers => 4,
  # identify self as Polipus/VERSION
  :user_agent => "Polipus - #{Polipus::VERSION} - #{Polipus::HOMEPAGE}",
  # by default, don't limit the depth of the crawl
  :depth_limit => false,
  # number of times HTTP redirects will be followed
  :redirect_limit => 5,
  # storage engine defaults to DevNull 
  :storage => nil,
  # proxy server hostname 
  :proxy_host => nil,
  # proxy server port number
  :proxy_port => false,
  # HTTP read timeout in seconds
  :read_timeout => 30,
  # HTTP open connection timeout in seconds
  :open_timeout => 10,
  # Time to wait for new messages on Redis
  # After this timeout, current crawling session is marked as terminated
  :queue_timeout => 30,
  # An URL tracker instance. default is Bloomfilter based on redis
  :url_tracker => nil,
  # A Redis options {} that will be passed directly to Redis.new
  :redis_options => {},
  # An instance of logger
  :logger => nil,
  # A logger level
  :logger_level => nil,
  # whether the query string should be included in the saved page
  :include_query_string_in_saved_page => true,
  # Max number of items to keep on redis
  :queue_items_limit => 2_000_000,
  # The adapter used to store exceed (queue_items_limit) redis items
  :queue_overflow_adapter => nil,
  # Every x seconds, the main queue is checked for overflowed items
  :queue_overflow_manager_check_time => 60,
  # If true, each page downloaded will increment a counter on redis
  :stats_enabled => false,
  # Cookies strategy
  :cookie_jar => nil,
  # whether or not accept cookies 
  :accept_cookies => false,
  # A set of hosts that should be considered parts of the same domain
  # Eg It can be used to follow links with and without 'www' domain
  :domain_aliases => [],
  # Mark a connection as staled after connection_max_hits request
  :connection_max_hits => nil,
  # Page TTL: mark a page as expired after ttl_page seconds
  :ttl_page => nil,
  # don't obey the robots exclusion protocol
  :obey_robots_txt => false
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_name = 'polipus', urls = [], options = {}) {|_self| ... } ⇒ PolipusCrawler

Returns a new instance of PolipusCrawler.

Yields:

  • (_self)

Yield Parameters:



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/polipus.rb', line 95

def initialize(job_name = 'polipus', urls = [], options = {})

  @job_name     = job_name
  @options      = OPTS.merge(options)
  @options[:queue_timeout] = 1 if @options[:queue_timeout] <= 0
  @logger       = @options[:logger]  ||= Logger.new(nil)
  
  unless @logger.class.to_s == "Log4r::Logger"
    @logger.level = @options[:logger_level] ||= Logger::INFO
  end

  @storage      = @options[:storage] ||= Storage.dev_null

  @http_pool    = []
  @workers_pool = []
  @queues_pool  = []
  
  
  @follow_links_like  = []
  @skip_links_like    = []
  @on_page_downloaded = []
  @on_before_save     = []
  @on_page_error      = []
  @focus_crawl_block  = nil
  @on_crawl_end       = []
  @redis_factory      = nil

  
  @overflow_manager = nil
  @crawler_name = `hostname`.strip + "-#{@job_name}"

  @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page]

  @urls = [urls].flatten.map{ |url| URI(url) }
  @urls.each{ |url| url.path = '/' if url.path.empty? }
  @internal_queue = queue_factory
  @robots = Polipus::Robotex.new(@options[:user_agent]) if @options[:obey_robots_txt]

  execute_plugin 'on_initialize'

  yield self if block_given?

end

Instance Attribute Details

#crawler_nameObject (readonly)

Returns the value of attribute crawler_name.



83
84
85
# File 'lib/polipus.rb', line 83

def crawler_name
  @crawler_name
end

#job_nameObject (readonly)

Returns the value of attribute job_name.



80
81
82
# File 'lib/polipus.rb', line 80

def job_name
  @job_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



81
82
83
# File 'lib/polipus.rb', line 81

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



82
83
84
# File 'lib/polipus.rb', line 82

def options
  @options
end

#storageObject (readonly)

Returns the value of attribute storage.



79
80
81
# File 'lib/polipus.rb', line 79

def storage
  @storage
end

Class Method Details

.crawl(*args, &block) ⇒ Object



139
140
141
# File 'lib/polipus.rb', line 139

def self.crawl(*args, &block)
  new(*args, &block).takeover
end

Instance Method Details

#add_to_queue(page) ⇒ Object



325
326
327
328
329
330
331
# File 'lib/polipus.rb', line 325

def add_to_queue(page)
  if [:url, :referer, :depth].all? { |method| page.respond_to?(method) }
    add_url(page.url, referer: page.referer, depth: page.depth)
  else
    add_url(page)
  end
end

#add_url(url, params = {}) {|page| ... } ⇒ Object

Enqueue an url, no matter what

Yields:

  • (page)


334
335
336
337
338
# File 'lib/polipus.rb', line 334

def add_url(url, params = {})
  page = Page.new(url, params)
  yield(page) if block_given?
  @internal_queue << page.to_json
end

#focus_crawl(&block) ⇒ Object

A block of code will be executed on every page downloaded. The code is used to extract urls to visit see links_for method



291
292
293
294
# File 'lib/polipus.rb', line 291

def focus_crawl(&block)
  @focus_crawl_block = block
  self
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it doesn’t match patterns



250
251
252
253
# File 'lib/polipus.rb', line 250

def follow_links_like(*patterns)
  @follow_links_like = @follow_links_like += patterns.uniq.compact
  self
end

#on_before_save(&block) ⇒ Object

A block of code will be executed on every page downloaded before being saved in the registered storage



277
278
279
280
# File 'lib/polipus.rb', line 277

def on_before_save(&block)
  @on_before_save << block
  self
end

#on_crawl_end(&block) ⇒ Object

A block of code will be executed when crawl session is over



270
271
272
273
# File 'lib/polipus.rb', line 270

def on_crawl_end(&block)
  @on_crawl_end << block
  self
end

#on_page_downloaded(&block) ⇒ Object

A block of code will be executed on every page downloaded The block takes the page as argument



264
265
266
267
# File 'lib/polipus.rb', line 264

def on_page_downloaded(&block)
  @on_page_downloaded << block
  self
end

#on_page_error(&block) ⇒ Object

A block of code will be executed whether a page contains an error



283
284
285
286
# File 'lib/polipus.rb', line 283

def on_page_error(&block)
  @on_page_error << block
  self
end

#queue_sizeObject



300
301
302
# File 'lib/polipus.rb', line 300

def queue_size
  @internal_queue.size
end

#redisObject



321
322
323
# File 'lib/polipus.rb', line 321

def redis
  @redis ||= redis_factory_adapter
end

#redis_factory(&block) ⇒ Object



308
309
310
311
# File 'lib/polipus.rb', line 308

def redis_factory(&block)
  @redis_factory = block
  self
end

#redis_optionsObject



296
297
298
# File 'lib/polipus.rb', line 296

def redis_options
  @options[:redis_options]
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it matches a pattern



257
258
259
260
# File 'lib/polipus.rb', line 257

def skip_links_like(*patterns)
  @skip_links_like = @skip_links_like += patterns.uniq.compact
  self
end

#stats_reset!Object



304
305
306
# File 'lib/polipus.rb', line 304

def stats_reset!
  ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each {|e| redis.del e}
end

#stop!(cler_queue = false) ⇒ Object

Request to Polipus to stop its work (gracefully) cler_queue = true if you want to delete all of the pending urls to visit



342
343
344
345
# File 'lib/polipus.rb', line 342

def stop!(cler_queue = false)
  PolipusSignalHandler.terminate
  @internal_queue.clear(true) if cler_queue
end

#takeoverObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/polipus.rb', line 143

def takeover
  PolipusSignalHandler.enable
  overflow_items_controller if queue_overflow_adapter

  @urls.each do |u|
    add_url(u) { |page| page.user_data.p_seeded = true }
  end
  return if @internal_queue.empty?

  execute_plugin 'on_crawl_start'
  @options[:workers].times do |worker_number|
    @workers_pool << Thread.new do
      @logger.debug {"Start worker #{worker_number}"}
      http  = @http_pool[worker_number]   ||= HTTP.new(@options)
      queue = @queues_pool[worker_number] ||= queue_factory
      queue.process(false, @options[:queue_timeout]) do |message|

        next if message.nil?

        execute_plugin 'on_message_received'

        page = Page.from_json message

        unless should_be_visited?(page.url, false)
          @logger.info {"[worker ##{worker_number}] Page (#{page.url.to_s}) is no more welcome."}
          queue.commit
          next
        end

        if page_exists? page
          @logger.info {"[worker ##{worker_number}] Page (#{page.url.to_s}) already stored."}
          queue.commit
          next
        end
        
        url = page.url.to_s
        @logger.debug {"[worker ##{worker_number}] Fetching page: [#{page.url.to_s}] Referer: #{page.referer} Depth: #{page.depth}"}

        execute_plugin 'on_before_download'

        pages = http.fetch_pages(url, page.referer, page.depth)
        if pages.count > 1
          rurls = pages.map { |e| e.url.to_s }.join(' --> ')
          @logger.info {"Got redirects! #{rurls}"}
          page = pages.pop
          page.aliases = pages.collect { |e| e.url }
          if page_exists? page
            @logger.info {"[worker ##{worker_number}] Page (#{page.url.to_s}) already stored."}
            queue.commit
            next
          end
        else
          page = pages.last
        end
        
        execute_plugin 'on_after_download'
        
        if page.error
          @logger.warn {"Page #{page.url} has error: #{page.error}"}
          incr_error
          @on_page_error.each {|e| e.call(page)}
        end

        # Execute on_before_save blocks
        @on_before_save.each {|e| e.call(page)}

        if page.storable?
          @storage.add page
        end
        
        @logger.debug {"[worker ##{worker_number}] Fetched page: [#{page.url.to_s}] Referrer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]"}
        @logger.info  {"[worker ##{worker_number}] Page (#{page.url.to_s}) downloaded"}

        incr_pages

        # Execute on_page_downloaded blocks
        @on_page_downloaded.each {|e| e.call(page)}

        if @options[:depth_limit] == false || @options[:depth_limit] > page.depth 
          links_for(page).each do |url_to_visit|
            next unless should_be_visited?(url_to_visit)
            enqueue url_to_visit, page, queue
          end
        else
          @logger.info {"[worker ##{worker_number}] Depth limit reached #{page.depth}"}
        end

        @logger.debug {"[worker ##{worker_number}] Queue size: #{queue.size}"}
        @overflow_manager.perform if @overflow_manager && queue.empty?
        execute_plugin 'on_message_processed'

        if PolipusSignalHandler.terminated?
          @logger.info {"About to exit! Thanks for using Polipus"}
          queue.commit
          break
        end
        true
      end
    end
  end
  @workers_pool.each {|w| w.join}
  @on_crawl_end.each {|e| e.call(self)}
  execute_plugin 'on_crawl_end'
end

#url_trackerObject



313
314
315
316
317
318
319
# File 'lib/polipus.rb', line 313

def url_tracker
  @url_tracker ||=
    @options[:url_tracker] ||=
      UrlTracker.bloomfilter(:key_name => "polipus_bf_#{job_name}",
                             :redis => redis_factory_adapter,
                             :driver => 'lua')
end