Class: Apollo::Planner::SmartPlanner

Inherits:
BasePlanner show all
Defined in:
lib/apollo_crawler/planner/smart_planner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner

Returns a new instance of SmartPlanner.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 38

def initialize(amqp=nil, mongo=nil, opts={})
  self.amqp = amqp
  self.mongo = mongo

  # Declarations
  channel = amqp.create_channel
  # channel.prefetch(1)
  self.declarations = Apollo::Agent.declare_entities(channel, opts)

  # Bindings
  declarations[:queues]["planner.fetched.queue"].bind(declarations[:exchanges]["planner.fetched"]).subscribe do |delivery_info, , payload|
    msg = JSON.parse(payload)
    puts "#{msg.inspect}" if opts[:verbose]

    puts "REQ: #{msg['request']}" if opts[:verbose]
    puts "RESP: #{msg['response']}" if opts[:verbose]

    request = msg['request']
    response = msg['response']

    doc = Apollo::Model::QueuedUrl.find(request["_id"])
    doc.update_attributes(msg['request'])
    doc.state = :fetched
    doc.save

    doc = Apollo::Model::RawDocument.where(:url => request['url']).first
    if doc
      if doc.sha_hash != response['sha_hash']
        puts "Removing old cached version of '#{request['url']}'" if opts[:verbose]
        
        doc.destroy
        doc = nil
      else
        puts "Using cached version of '#{request['url']}'" if opts[:verbose]
      end
    else
      doc = Apollo::Model::RawDocument.where(:sha_hash => response['sha_hash']).first
      if(doc.nil? == false)
        puts "Same as #{doc.inspect}"
      end
    end

    if(doc.nil?)
      doc = Apollo::Model::RawDocument.new(response).save

      # Publish
      declarations[:exchanges]["crawler"].publish(msg.to_json, :reply_to => "planner.crawled")
    end
  end

  declarations[:queues]["planner.domained.queue"].bind(declarations[:exchanges]["planner.domained"]).subscribe do |delivery_info, , payload|
    msg = JSON.parse(payload)

    puts "DOMAINED !!!"
  end

  declarations[:queues]["planner.crawled.queue"].bind(declarations[:exchanges]["planner.crawled"]).subscribe do |delivery_info, , payload|
    msg = JSON.parse(payload)

    # puts "Crawled - #{msg.inspect}"

    request = msg['request']
    response = msg['response']
    data = msg['data']
    links = msg['links']
    links = [] if links.nil?

    data_hash = Digest::SHA256.new.update(data).hexdigest
    puts "#{data_hash}"

    links.each do |url|
      link = url['link']
      
      Apollo::Scheduler::BaseScheduler::schedule(link, request['crawler_name'])
    end

    # puts JSON.pretty_generate(data)
    # puts JSON.pretty_generate(links)
  end
end

Instance Attribute Details

#amqpObject

Returns the value of attribute amqp.



34
35
36
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 34

def amqp
  @amqp
end

#declarationsObject

Returns the value of attribute declarations.



36
37
38
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 36

def declarations
  @declarations
end

#mongoObject

Returns the value of attribute mongo.



35
36
37
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 35

def mongo
  @mongo
end

Instance Method Details

#fetch_queued_urls(opts = {}) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 138

def fetch_queued_urls(opts={})
  fetching_count = Apollo::Model::QueuedUrl.where({:state => :fetching}).count
  
  if(fetching_count > 4)
    puts "Fetching too many URLs. Slowing down for a while ..."
    return
  end

  while get_url_count(:fetching) < 4
    url = get_next_url(opts)
    puts "SmartPlanner::fetch_queued_urls() - Queueing: #{url.inspect}"
    fetch_url(url, opts)
  end
end

#fetch_url(url, opts = {}) ⇒ Object



123
124
125
126
127
128
129
130
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 123

def fetch_url(url, opts={})
  if(opts[:verbose])
    puts "AMQP fetching '#{url.inspect}'"
  end

  # Publish
  declarations[:exchanges]["fetcher"].publish(url.to_json, :reply_to => "planner.fetched")
end

#get_next_url(opts = {}) ⇒ Object



132
133
134
135
136
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 132

def get_next_url(opts={})
  tmp = Apollo::Model::QueuedUrl.where({:state => :queued}).order_by(:created_at.asc)
  res = tmp.find_and_modify({ "$set" => { state: :fetching }}, new: true)
  return res
end

#get_url_count(state, opts = {}) ⇒ Object



119
120
121
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 119

def get_url_count(state, opts={})
  Apollo::Model::QueuedUrl.where({:state => state}).count
end

#run(opts = {}) ⇒ Object



153
154
155
156
157
158
159
160
161
162
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 153

def run(opts={})
  request_exit = false
  
  while request_exit == false
    fetch_queued_urls(opts)
    sleep 1
  end

  return 0
end