Class: Polipus::PolipusCrawler

Inherits:
Object
  • Object
show all
Defined in:
lib/polipus.rb

Constant Summary collapse

OPTS =
{
  # run 4 threads
  :workers => 4,
  # identify self as Polipus/VERSION
  :user_agent => "Polipus - #{Polipus::VERSION} - #{Polipus::HOMEPAGE}",
  # by default, don't limit the depth of the crawl
  :depth_limit => false,
  # number of times HTTP redirects will be followed
  :redirect_limit => 5,
  # storage engine defaults to DevNull 
  :storage => nil,
  # proxy server hostname 
  :proxy_host => nil,
  # proxy server port number
  :proxy_port => false,
  # HTTP read timeout in seconds
  :read_timeout => 30,
  # HTTP open connection timeout in seconds
  :open_timeout => 10,
  # Time to wait for new messages on Redis
  # After this timeout, current crawling session is marked as terminated
  :queue_timeout => 30,
  # An URL tracker instance. default is Bloomfilter based on redis
  :url_tracker => nil,
  # A Redis options {} that will be passed directly to Redis.new
  :redis_options => {},
  # An instance of logger
  :logger => nil,
  # A logger level
  :logger_level => nil,
  # whether the query string should be included in the saved page
  :include_query_string_in_saved_page => true,
  # Max number of items to keep on redis
  :queue_items_limit => 2_000_000,
  # The adapter used to store exceed (queue_items_limit) redis items
  :queue_overflow_adapter => nil,
  # Every x seconds, the main queue is checked for overflowed items
  :queue_overflow_manager_check_time => 60,
  # If true, each page downloaded will increment a counter on redis
  :stats_enabled => false,
  # Cookies strategy
  :cookie_jar => nil,
  :accept_cookies => false,
  # A set of hosts that should be considered parts of the same domain
  # Eg It can be used to follow links with and without 'www' domain
  :domain_aliases => [],
  # Mark a connection as staled after connection_max_hits request
  :connection_max_hits => nil,
  # Page TTL: mark a page as expired after ttl_page seconds
  :ttl_page => nil
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_name = 'polipus', urls = [], options = {}) {|_self| ... } ⇒ PolipusCrawler

Returns a new instance of PolipusCrawler.

Yields:

  • (_self)

Yield Parameters:



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/polipus.rb', line 91

def initialize(job_name = 'polipus', urls = [], options = {})

  @job_name     = job_name
  @options      = OPTS.merge(options)
  @options[:queue_timeout] = 1 if @options[:queue_timeout] <= 0
  @logger       = @options[:logger]  ||= Logger.new(nil)
  
  unless @logger.class.to_s == "Log4r::Logger"
    @logger.level = @options[:logger_level] ||= Logger::INFO
  end

  @storage      = @options[:storage] ||= Storage.dev_null

  @http_pool    = []
  @workers_pool = []
  @queues_pool  = []
  
  
  @follow_links_like  = []
  @skip_links_like    = []
  @on_page_downloaded = []
  @on_before_save     = []
  @focus_crawl_block  = nil
  @on_crawl_end       = []
  @redis_factory      = nil

  
  @overflow_manager = nil
  @crawler_name = `hostname`.strip + "-#{@job_name}"

  @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page]

  @urls = [urls].flatten.map{ |url| URI(url) }
  @urls.each{ |url| url.path = '/' if url.path.empty? }

  @internal_queue = queue_factory

  execute_plugin 'on_initialize'

  yield self if block_given?

end

Instance Attribute Details

#crawler_nameObject (readonly)

Returns the value of attribute crawler_name.



79
80
81
# File 'lib/polipus.rb', line 79

def crawler_name
  @crawler_name
end

#job_nameObject (readonly)

Returns the value of attribute job_name.



76
77
78
# File 'lib/polipus.rb', line 76

def job_name
  @job_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



77
78
79
# File 'lib/polipus.rb', line 77

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



78
79
80
# File 'lib/polipus.rb', line 78

def options
  @options
end

#storageObject (readonly)

Returns the value of attribute storage.



75
76
77
# File 'lib/polipus.rb', line 75

def storage
  @storage
end

Class Method Details

.crawl(*args, &block) ⇒ Object



134
135
136
# File 'lib/polipus.rb', line 134

def self.crawl(*args, &block)
  new(*args, &block).takeover
end

Instance Method Details

#add_url(url) ⇒ Object

Enqueue an url, no matter what



317
318
319
320
# File 'lib/polipus.rb', line 317

def add_url url
  page = Page.new(url)
  @internal_queue << page.to_json
end

#focus_crawl(&block) ⇒ Object

A block of code will be executed on every page downloaded. The code is used to extract urls to visit see links_for method



282
283
284
285
# File 'lib/polipus.rb', line 282

def focus_crawl(&block)
  @focus_crawl_block = block
  self
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it doesn’t match patterns



248
249
250
251
# File 'lib/polipus.rb', line 248

def follow_links_like(*patterns)
  @follow_links_like = @follow_links_like += patterns.uniq.compact
  self
end

#on_before_save(&block) ⇒ Object

A block of code will be executed on every page downloaded before being saved in the registered storage



274
275
276
277
# File 'lib/polipus.rb', line 274

def on_before_save(&block)
  @on_before_save << block
  self
end

#on_crawl_end(&block) ⇒ Object



267
268
269
270
# File 'lib/polipus.rb', line 267

def on_crawl_end(&block)
  @on_crawl_end << block
  self
end

#on_page_downloaded(&block) ⇒ Object

A block of code will be executed on every page downloaded The block takes the page as argument



262
263
264
265
# File 'lib/polipus.rb', line 262

def on_page_downloaded(&block)
  @on_page_downloaded << block
  self
end

#queue_sizeObject



291
292
293
# File 'lib/polipus.rb', line 291

def queue_size
  @internal_queue.size
end

#redisObject



312
313
314
# File 'lib/polipus.rb', line 312

def redis
  @redis ||= redis_factory_adapter
end

#redis_factory(&block) ⇒ Object



299
300
301
302
# File 'lib/polipus.rb', line 299

def redis_factory(&block)
  @redis_factory = block
  self
end

#redis_optionsObject



287
288
289
# File 'lib/polipus.rb', line 287

def redis_options
  @options[:redis_options]
end

A pattern or an array of patterns can be passed as argument An url will be discarded if it matches a pattern



255
256
257
258
# File 'lib/polipus.rb', line 255

def skip_links_like(*patterns)
  @skip_links_like = @skip_links_like += patterns.uniq.compact
  self
end

#stats_reset!Object



295
296
297
# File 'lib/polipus.rb', line 295

def stats_reset!
  ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each {|e| redis.del e}
end

#stop!(cler_queue = false) ⇒ Object

Request to Polipus to stop its work (gracefully) cler_queue = true if you want to delete all of the pending urls to visit



324
325
326
327
# File 'lib/polipus.rb', line 324

def stop!(cler_queue = false)
  PolipusSignalHandler.terminate
  @internal_queue.clear(true) if cler_queue
end

#takeoverObject



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/polipus.rb', line 138

def takeover
  PolipusSignalHandler.enable
  overflow_items_controller if queue_overflow_adapter

  q = queue_factory
  @urls.each do |u|
    page = Page.new(u.to_s, :referer => '')
    page.user_data.p_seeded = true
    q << page.to_json
  end

  return if q.empty?

  execute_plugin 'on_crawl_start'
  @options[:workers].times do |worker_number|
    @workers_pool << Thread.new do
      @logger.debug {"Start worker #{worker_number}"}
      http  = @http_pool[worker_number]   ||= HTTP.new(@options)
      queue = @queues_pool[worker_number] ||= queue_factory
      queue.process(false, @options[:queue_timeout]) do |message|

        next if message.nil?

        execute_plugin 'on_message_received'

        page = Page.from_json message

        unless should_be_visited?(page.url, false)
          @logger.info {"[worker ##{worker_number}] Page (#{page.url.to_s}) is no more welcome."}
          queue.commit
          next
        end

        if page_exists? page
          @logger.info {"[worker ##{worker_number}] Page (#{page.url.to_s}) already stored."}
          queue.commit
          next
        end
        
        url = page.url.to_s
        @logger.debug {"[worker ##{worker_number}] Fetching page: [#{page.url.to_s}] Referer: #{page.referer} Depth: #{page.depth}"}

        execute_plugin 'on_before_download'

        pages = http.fetch_pages(url, page.referer, page.depth)
        if pages.count > 1
          rurls = pages.map { |e| e.url.to_s }.join(' --> ')
          @logger.info {"Got redirects! #{rurls}"}
          page = pages.pop
          page.aliases = pages.collect { |e| e.url }
          if page_exists? page
            @logger.info {"[worker ##{worker_number}] Page (#{page.url.to_s}) already stored."}
            queue.commit
            next
          end
        else
          page = pages.last
        end
        
        # Execute on_before_save blocks
        @on_before_save.each {|e| e.call(page)} unless page.nil?
        execute_plugin 'on_after_download'
        
        @logger.warn {"Page #{page.url} has error: #{page.error}"} if page.error

        incr_error if page.error

        if page && page.storable?
          @storage.add page
        end
        
        if page
          @logger.debug {"[worker ##{worker_number}] Fetched page: [#{page.url.to_s}] Referrer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]"}
          @logger.info  {"[worker ##{worker_number}] Page (#{page.url.to_s}) downloaded"}
        end
        
        incr_pages

        # Execute on_page_downloaded blocks
        @on_page_downloaded.each {|e| e.call(page)} unless page.nil?

        if @options[:depth_limit] == false || @options[:depth_limit] > page.depth 
          links_for(page).each do |url_to_visit|
            next unless should_be_visited?(url_to_visit)
            enqueue url_to_visit, page, queue
          end
        else
          @logger.info {"[worker ##{worker_number}] Depth limit reached #{page.depth}"}
        end

        @logger.debug {"[worker ##{worker_number}] Queue size: #{queue.size}"}
        @overflow_manager.perform if @overflow_manager && queue.empty?
        execute_plugin 'on_message_processed'

        if PolipusSignalHandler.terminated?
          @logger.info {"About to exit! Thanks for using Polipus"}
          queue.commit
          break
        end
        true
      end
    end
  end
  @workers_pool.each {|w| w.join}
  @on_crawl_end.each {|e| e.call(self)}
  execute_plugin 'on_crawl_end'
end

#url_trackerObject



304
305
306
307
308
309
310
# File 'lib/polipus.rb', line 304

def url_tracker
  @url_tracker ||=
    @options[:url_tracker] ||=
      UrlTracker.bloomfilter(:key_name => "polipus_bf_#{job_name}",
                             :redis => redis_factory_adapter,
                             :driver => 'lua')
end