Class: CrawlHelper

Inherits:
Object
  • Object
show all
Defined in:
lib/crawl_helper.rb

Class Method Summary collapse

Class Method Details

.crawl_page(content_request) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/crawl_helper.rb', line 8

def self.crawl_page(content_request)
  # change all hash keys to symbols
  content_request = HashUtil.deep_symbolize_keys(content_request)
  @content_request = content_request

  content_request[:redis_options] = {} unless content_request.has_key? :redis_options
  content_request[:crawl_limit_by_page] = false unless content_request.has_key? :crawl_limit_by_page
  content_request[:valid_mime_types] = ["*/*"] unless content_request.has_key? :valid_mime_types
  content_request[:queue_system] = content_request[:queue_system].to_sym

  @redis = NamespacedRedisConnection.new(content_request[:redis_options], "cobweb-#{Cobweb.version}-#{content_request[:crawl_id]}")
  @stats = Stats.new(content_request)

  @debug = content_request[:debug]

  decrement_queue_counter

  # check we haven't crawled this url before
  unless @redis.sismember "crawled", content_request[:url]
    # if there is no limit or we're still under it lets get the url
    if within_crawl_limits?(content_request[:crawl_limit])
      content = Cobweb.new(content_request).get(content_request[:url], content_request)
      if content_request[:url] == @redis.get("original_base_url")
         @redis.set("crawled_base_url", content[:base_url])
      end
      if is_permitted_type(content)
        begin
          # move the url from the queued list to the crawled list - for both the original url, and the content url (to handle redirects)
          @redis.srem "queued", content_request[:url]
          @redis.sadd "crawled", content_request[:url]
          @redis.srem "queued", content[:url]
          @redis.sadd "crawled", content[:url]
          # increment the counter if we are not limiting by page only || we are limiting count by page and it is a page
          if content_request[:crawl_limit_by_page]
            if content[:mime_type].match("text/html")
              increment_crawl_started_counter
            end
          else
            increment_crawl_started_counter
          end

          ## update statistics
          @stats.update_status("Crawling #{content_request[:url]}...")
          @stats.update_statistics(content)

          # set the base url if this is the first page
          set_base_url @redis, content, content_request

          @cobweb_links = CobwebLinks.new(content_request)
          if within_queue_limits?(content_request[:crawl_limit])
            internal_links = ContentLinkParser.new(content_request[:url], content[:body], content_request).all_links(:valid_schemes => [:http, :https])

            # select the link if its internal
            internal_links.select! { |link| @cobweb_links.internal?(link) }

            # if the site has the same content for http and https then normalize to http 
            if @options[:treat_https_as_http]
              internal_links.map!{|link| link.gsub(/^https/, "http")}
            end

            # reject the link if we've crawled it or queued it
            internal_links.reject! { |link| @redis.sismember("crawled", link) }
            internal_links.reject! { |link| @redis.sismember("queued", link) }

            internal_links.each do |link|
              enqueue_content(content_request, link) if within_queue_limits?(content_request[:crawl_limit])
            end
          end

          # enqueue to processing queue
          send_to_processing_queue(content, content_request)

          #if the enqueue counter has been requested update that
          if content_request.has_key? :enqueue_counter_key
            enqueue_redis = NamespacedRedisConnection.new(content_request[:redis_options], content_request[:enqueue_counter_namespace].to_s)
            current_count = enqueue_redis.hget(content_request[:enqueue_counter_key], content_request[:enqueue_counter_field]).to_i
            enqueue_redis.hset(content_request[:enqueue_counter_key], content_request[:enqueue_counter_field], current_count+1)
          end

        ensure
          #update the queued and crawled lists if we are within the crawl limits.

          # update the queue and crawl counts -- doing this very late in the piece so that the following transaction all occurs at once.
          # really we should do this with a lock https://github.com/PatrickTulskie/redis-lock
          if content_request[:crawl_limit_by_page]
            if content[:mime_type].match("text/html")
              increment_crawl_counter
            end
          else
            increment_crawl_counter
          end
          puts "Crawled: #{@crawl_counter} Limit: #{content_request[:crawl_limit]} Queued: #{@queue_counter} In Progress: #{@crawl_started_counter-@crawl_counter}" if @debug
        end
      else
        puts "ignoring #{content_request[:url]} as mime_type is #{content[:mime_type]}" if content_request[:debug]
      end
    else
      puts "ignoring #{content_request[:url]} as outside of crawl limits." if content_request[:debug]
    end

  else
    @redis.srem "queued", content_request[:url]
    puts "Already crawled #{content_request[:url]}" if content_request[:debug]
  end

  # if there's nothing left queued or the crawled limit has been reached
  refresh_counters
  if content_request[:crawl_limit].nil? || content_request[:crawl_limit] == 0
    if @queue_counter+@crawl_started_counter-@crawl_counter == 0
      finished(content_request)
    end
  elsif (@queue_counter +@crawl_started_counter-@crawl_counter)== 0 || @crawl_counter >= content_request[:crawl_limit].to_i
    finished(content_request)
  end

end

.finished(content_request) ⇒ Object

Sets the crawl status to ‘Crawl Finished’ and enqueues the crawl finished job



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/crawl_helper.rb', line 126

def self.finished(content_request)
  # finished
  if @redis.hget("statistics", "current_status")!= "Crawl Finished"
    #ap "CRAWL FINISHED  #{content_request[:url]}, #{counters}, #{@redis.get("original_base_url")}, #{@redis.get("crawled_base_url")}" if content_request[:debug]
    @stats.end_crawl(content_request)

    additional_stats = {:crawl_id => content_request[:crawl_id], :crawled_base_url => @redis.get("crawled_base_url")}
    additional_stats[:redis_options] = content_request[:redis_options] unless content_request[:redis_options] == {}
    additional_stats[:source_id] = content_request[:source_id] unless content_request[:source_id].nil?

    if content_request[:queue_system] == :resque
      Resque.enqueue(const_get(content_request[:crawl_finished_queue]), @stats.get_statistics.merge(additional_stats))
    elsif content_request[:queue_system] == :sidekiq
      puts "Queueing Finished on Sidekiq"
      const_get(content_request[:crawl_finished_queue]).perform_async(@stats.get_statistics.merge(additional_stats))
    else
      raise "Unknown queue system: #{content_request[:queue_system]}"
    end
  else
    # nothing to report here, we're skipping the remaining urls as we're outside of the crawl limit
  end
end

.send_to_processing_queue(content, content_request) ⇒ Object

Enqueues the content to the processing queue setup in options



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/crawl_helper.rb', line 150

def self.send_to_processing_queue(content, content_request)
  content_to_send = content.merge({:internal_urls => content_request[:internal_urls], :redis_options => content_request[:redis_options], :source_id => content_request[:source_id], :crawl_id => content_request[:crawl_id]})
  if content_request[:direct_call_process_job]
    clazz = const_get(content_request[:processing_queue])
    clazz.perform(content_to_send)
  elsif content_request[:use_encoding_safe_process_job]
    content_to_send[:body] = Base64.encode64(content[:body])
    content_to_send[:processing_queue] = content_request[:processing_queue]
    if content_request[:queue_system] == :resque
      Resque.enqueue(EncodingSafeProcessJob, content_to_send)
    elsif content_request[:queue_system] == :sidekiq
      const_get(content_request[:processing_queue]).perform_async(content_to_send)
    else
      raise "Unknown queue system: #{content_request[:queue_system]}"
    end
  else
    if content_request[:queue_system] == :resque
      Resque.enqueue(const_get(content_request[:processing_queue]), content_to_send)
    elsif content_request[:queue_system] == :sidekiq
      puts "Queueing on Sidekiq"
      const_get(content_request[:processing_queue]).perform_async(content_to_send)
    else
      raise "Unknown queue system: #{content_request[:queue_system]}"
    end
  end
  puts "#{content_request[:url]} has been sent for processing. use_encoding_safe_process_job: #{content_request[:use_encoding_safe_process_job]}" if content_request[:debug]
end