Class: Apollo::Planner::SmartPlanner
- Inherits:
-
BasePlanner
- Object
- BasePlanner
- Apollo::Planner::SmartPlanner
- Defined in:
- lib/apollo_crawler/planner/smart_planner.rb
Instance Attribute Summary collapse
-
#amqp ⇒ Object
Returns the value of attribute amqp.
-
#declarations ⇒ Object
Returns the value of attribute declarations.
-
#mongo ⇒ Object
Returns the value of attribute mongo.
Instance Method Summary collapse
- #fetch_queued_urls(opts = {}) ⇒ Object
- #fetch_url(url, opts = {}) ⇒ Object
- #get_next_url(opts = {}) ⇒ Object
-
#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner
constructor
A new instance of SmartPlanner.
- #run(opts = {}) ⇒ Object
Constructor Details
#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner
Returns a new instance of SmartPlanner.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 38 def initialize(amqp=nil, mongo=nil, opts={}) self.amqp = amqp self.mongo = mongo # Declarations channel = amqp.create_channel self.declarations = Apollo::Agent.declare_exchanges(channel, opts) # Bindings declarations[:queues]["planner.fetched.queue"].bind(declarations[:exchanges]["planner.fetched"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) request = msg['request'] response = msg['response'] doc = Apollo::Model::QueuedUrl.find(request["_id"]) doc.update_attributes(msg['request']) doc.state = :fetched doc.save doc = Apollo::Model::RawDocument.where(:url => request['url']).first if doc if doc.sha_hash != response['sha_hash'] puts "Removing old cached version of '#{request['url']}'" if opts[:verbose] doc.destroy doc = nil else puts "Using cached version of '#{request['url']}'" if opts[:verbose] end else doc = Apollo::Model::RawDocument.where(:sha_hash => response['sha_hash']).first end if(doc.nil?) doc = Apollo::Model::RawDocument.new(response).save end # Publish declarations[:exchanges]["crawler"].publish(msg.to_json, :reply_to => "planner.crawled") end declarations[:queues]["planner.domained.queue"].bind(declarations[:exchanges]["planner.domained"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) puts "DOMAINED !!!" end declarations[:queues]["planner.crawled.queue"].bind(declarations[:exchanges]["planner.crawled"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) request = msg['request'] response = msg['response'] data = msg['data'] links = msg['links'] links = [] if links.nil? links.each do |url| link = url['link'] Apollo::Scheduler::BaseScheduler::schedule(link, request['crawler_name']) end # puts JSON.pretty_generate(data) # puts JSON.pretty_generate(links) end end |
Instance Attribute Details
#amqp ⇒ Object
Returns the value of attribute amqp.
34 35 36 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 34 def amqp @amqp end |
#declarations ⇒ Object
Returns the value of attribute declarations.
36 37 38 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 36 def declarations @declarations end |
#mongo ⇒ Object
Returns the value of attribute mongo.
35 36 37 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 35 def mongo @mongo end |
Instance Method Details
#fetch_queued_urls(opts = {}) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 119 def fetch_queued_urls(opts={}) url = get_next_url(opts) while url puts url.inspect # puts "Count of URLs in Queue: #{url.count}" if opts[:verbose] fetch_url(url, opts) url = get_next_url() end end |
#fetch_url(url, opts = {}) ⇒ Object
106 107 108 109 110 111 112 113 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 106 def fetch_url(url, opts={}) if(opts[:verbose]) puts "AMQP fetching '#{url.inspect}'" end # Publish declarations[:exchanges]["fetcher"].publish(url.to_json, :reply_to => "planner.fetched") end |
#get_next_url(opts = {}) ⇒ Object
115 116 117 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 115 def get_next_url(opts={}) Apollo::Model::QueuedUrl.where({:state => :queued}).find_and_modify({ "$set" => { state: :fetching }}, new: true) end |
#run(opts = {}) ⇒ Object
132 133 134 135 136 137 138 139 140 141 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 132 def run(opts={}) request_exit = false while request_exit == false fetch_queued_urls(opts) sleep 1 end return 0 end |