Class: Polipus::PolipusCrawler

Inherits:
Object
  • Object
show all
Defined in:
lib/polipus.rb

Constant Summary collapse

OPTS =
{
  # run 4 threads
  :workers => 4,
  # identify self as Anemone/VERSION
  :user_agent => "Polipus - #{Polipus::VERSION} - #{Polipus::HOMEPAGE}",
  # by default, don't limit the depth of the crawl
  :depth_limit => false,
  # number of times HTTP redirects will be followed
  :redirect_limit => 5,
  # storage engine defaults to DevNull 
  :storage => nil,
  # proxy server hostname 
  :proxy_host => nil,
  # proxy server port number
  :proxy_port => false,
  # HTTP read timeout in seconds
  :read_timeout => 30,
  # HTTP open connection timeout in seconds
  :open_timeout => 10,
  # An URL tracker instance. default is Bloomfilter based on redis
  :url_tracker => nil,
  # A Redis options {} that will be passed directly to Redis.new
  :redis_options => {},
  # An instance of logger
  :logger => nil,
  # whether the query string should be included in the saved page
  :include_query_string_in_saved_page => true,
  # Max number of items to keep on redis
  :queue_items_limit => 2_000_000,
  # The adapter used to store exceed (queue_items_limit) redis items
  :queue_overflow_adapter => nil,
  # Every x seconds, the main queue is checked for overflowed items
  :queue_overflow_manager_check_time => 60,
  # If true, each page downloaded will increment a counter on redis
  :stats_enabled => false,
  # Cookies strategy
  :cookie_jar => nil,
  :accept_cookies => false,
  # A set of hosts that should be considered parts of the same domain
  # Eg It can be used to follow links with and without 'www' domain
  :domain_aliases => [],
  # Mark a connection as staled after connection_max_hits request
  :connection_max_hits => nil
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_name = 'polipus', urls = [], options = {}) {|_self| ... } ⇒ PolipusCrawler

Returns a new instance of PolipusCrawler.

Yields:

  • (_self)

Yield Parameters:



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/polipus.rb', line 85

def initialize(job_name = 'polipus',urls = [], options = {})

  @job_name     = job_name
  @options      = OPTS.merge(options)
  @logger       = @options[:logger]  ||= Logger.new(nil)
  
  unless @logger.is_a?(Log4r::Logger)
    @logger.level = @options[:logger_level] ||= Logger::INFO  
  end
  
  @storage      = @options[:storage] ||= Storage.dev_null

  @http_pool    = []
  @workers_pool = []
  @queues_pool  = []
  
  
  @follow_links_like  = []
  @skip_links_like    = []
  @on_page_downloaded = []
  @on_before_save     = []
  @focus_crawl_block  = nil
  @on_crawl_end       = []
  @redis_factory      = nil

  
  @overflow_manager = nil
  @crawler_name = `hostname`.strip + "-#{@job_name}"

  @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page]

  @urls = [urls].flatten.map{ |url| url.is_a?(URI) ? url : URI(url) }
  @urls.each{ |url| url.path = '/' if url.path.empty? }
  execute_plugin 'on_initialize'

  yield self if block_given?

end

Instance Attribute Details

#crawler_nameObject (readonly)

Returns the value of attribute crawler_name.



73
74
75
# File 'lib/polipus.rb', line 73

def crawler_name
  @crawler_name
end

#job_nameObject (readonly)

Returns the value of attribute job_name.



69
70
71
# File 'lib/polipus.rb', line 69

def job_name
  @job_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



70
71
72
# File 'lib/polipus.rb', line 70

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



72
73
74
# File 'lib/polipus.rb', line 72

def options
  @options
end

#overflow_adapterObject (readonly)

Returns the value of attribute overflow_adapter.



71
72
73
# File 'lib/polipus.rb', line 71

def overflow_adapter
  @overflow_adapter
end

#storageObject (readonly)

Returns the value of attribute storage.



68
69
70
# File 'lib/polipus.rb', line 68

def storage
  @storage
end

Class Method Details

.crawl(job_name, urls, opts = {}) ⇒ Object



124
125
126
127
128
129
130
131
132
# File 'lib/polipus.rb', line 124

def self.crawl(job_name, urls, opts = {})

  self.new(job_name, urls, opts) do |polipus|
    yield polipus if block_given?
    
    polipus.takeover
  end
  
end

Instance Method Details

#add_url(url) ⇒ Object



318
319
320
321
322
# File 'lib/polipus.rb', line 318

def add_url url
  @url_tracker.remove url.to_s
  page = Page.new(url)
  queue_factory << page.to_json
end

#focus_crawl(&block) ⇒ Object

A block of code will be executed on every page donloaded. The code is used to extract urls to visit see links_for method



277
278
279
280
# File 'lib/polipus.rb', line 277

def focus_crawl(&block)
  @focus_crawl_block = block
  self
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it doesn’t match patterns



243
244
245
246
# File 'lib/polipus.rb', line 243

def follow_links_like(*patterns)
  @follow_links_like = @follow_links_like += patterns.uniq.compact
  self
end

#on_before_save(&block) ⇒ Object

A block of code will be executed on every page donloaded before being saved in the registered storage



269
270
271
272
# File 'lib/polipus.rb', line 269

def on_before_save(&block)
  @on_before_save << block
  self
end

#on_crawl_end(&block) ⇒ Object



262
263
264
265
# File 'lib/polipus.rb', line 262

def on_crawl_end(&block)
  @on_crawl_end << block
  self
end

#on_page_downloaded(&block) ⇒ Object

A block of code will be executed on every page dowloaded The block takes the page as argument



257
258
259
260
# File 'lib/polipus.rb', line 257

def on_page_downloaded(&block)
  @on_page_downloaded << block
  self
end

#queue_sizeObject



290
291
292
293
# File 'lib/polipus.rb', line 290

def queue_size
  @internal_queue ||= queue_factory
  @internal_queue.size
end

#redisObject



311
312
313
314
315
316
# File 'lib/polipus.rb', line 311

def redis
  if @redis.nil?
    @redis = redis_factory_adapter
  end
  @redis
end

#redis_factory(&block) ⇒ Object



299
300
301
302
# File 'lib/polipus.rb', line 299

def redis_factory(&block)
  @redis_factory = block
  self
end

#redis_optionsObject



282
283
284
# File 'lib/polipus.rb', line 282

def redis_options
  @options[:redis_options]
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it matches a pattern



250
251
252
253
# File 'lib/polipus.rb', line 250

def skip_links_like(*patterns)
  @skip_links_like = @skip_links_like += patterns.uniq.compact
  self
end

#stats_reset!Object



295
296
297
# File 'lib/polipus.rb', line 295

def stats_reset!
  ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each {|e| redis.del e}
end

#stop!(cler_queue = false) ⇒ Object

Request to Polipus to stop its work (gracefully) cler_queue = true if you want to delete all of the pending urls to visit



326
327
328
329
# File 'lib/polipus.rb', line 326

def stop!(cler_queue = false)
  PolipusSignalHandler.terminate
  queue_factory.clear(true) if cler_queue
end

#takeoverObject



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/polipus.rb', line 134

def takeover
  PolipusSignalHandler.enable
  overflow_items_controller if queue_overflow_adapter

  q = queue_factory
  @urls.each do |u|
    next if url_tracker.visited?(u.to_s)
    q << Page.new(u.to_s, :referer => '').to_json
  end

  return if q.empty?

  execute_plugin 'on_crawl_start'
  @options[:workers].times do |worker_number|
    @workers_pool << Thread.new do
      @logger.debug {"Start worker #{worker_number}"}
      http  = @http_pool[worker_number]   ||= HTTP.new(@options)
      queue = @queues_pool[worker_number] ||= queue_factory
      queue.process(false, @options[:read_timeout]) do |message|

        next if message.nil?

        execute_plugin 'on_message_received'

        page = Page.from_json message

        unless should_be_visited?(page.url, false)
          @logger.info {"[worker ##{worker_number}] Page [#{page.url.to_s}] is no more welcome."}
          queue.commit
          next
        end

        if @storage.exists? page
          @logger.info {"[worker ##{worker_number}] Page [#{page.url.to_s}] already stored."}
          queue.commit
          next
        end
        
        url = page.url.to_s
        @logger.debug {"[worker ##{worker_number}] Fetching page: [#{page.url.to_s}] Referer: #{page.referer} Depth: #{page.depth}"}

        execute_plugin 'on_before_download'

        pages = http.fetch_pages(url, page.referer, page.depth)
        if pages.count > 1
          rurls = pages.map { |e| e.url.to_s }.join(' --> ')
          @logger.info {"Got redirects! #{rurls}"}
          page = pages.pop
          page.aliases = pages.collect { |e| e.url }
          if @storage.exists?(page)
            @logger.info {"[worker ##{worker_number}] Page [#{page.url.to_s}] already stored."}
            queue.commit
            next
          end
        else
          page = pages.last
        end
        
        # Execute on_before_save blocks
        @on_before_save.each {|e| e.call(page)} unless page.nil?
        execute_plugin 'on_after_download'
        
        @logger.warn {"Page #{page.url} has error: #{page.error}"} if page.error

        incr_error if page.error

        if page && page.storable?
          @storage.add page
        end
        
        if page
          @logger.debug {"[worker ##{worker_number}] Fetched page: [#{page.url.to_s}] Referer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]"}
          @logger.info  {"[worker ##{worker_number}] Page [#{page.url.to_s}] downloaded"}
        end
        
        incr_pages

        # Execute on_page_downloaded blocks
        @on_page_downloaded.each {|e| e.call(page)} unless page.nil?

        if @options[:depth_limit] == false || @options[:depth_limit] > page.depth 
          links_for(page).each do |url_to_visit|
            next unless should_be_visited?(url_to_visit)
            enqueue url_to_visit, page, queue
          end
        else
          @logger.info {"[worker ##{worker_number}] Depth limit reached #{page.depth}"}
        end

        @logger.debug {"[worker ##{worker_number}] Queue size: #{queue.size}"}
        @overflow_manager.perform if @overflow_manager && queue.empty?
        execute_plugin 'on_message_processed'

        if PolipusSignalHandler.terminated?
          @logger.info {"About to exit! Thanks for using Polipus"}
          queue.commit
          break
        end
        true
      end
    end
  end
  @workers_pool.each {|w| w.join}
  @on_crawl_end.each {|e| e.call(self)}
  execute_plugin 'on_crawl_end'
end

#url_trackerObject



304
305
306
307
308
309
# File 'lib/polipus.rb', line 304

def url_tracker
  if @url_tracker.nil?
    @url_tracker  = @options[:url_tracker] ||= UrlTracker.bloomfilter(:key_name => "polipus_bf_#{job_name}", :redis => redis_factory_adapter, :driver => 'lua')
  end
  @url_tracker
end