Class: Apollo::Agent::FetcherAgent

Inherits:
BaseAgent
  • Object
show all
Defined in:
lib/apollo_crawler/agent/fetcher_agent.rb

Constant Summary collapse

THREAD_POOL_SIZE =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BaseAgent

#run

Constructor Details

#initialize(amqp, opts = {}) ⇒ FetcherAgent

Returns a new instance of FetcherAgent.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 41

def initialize(amqp, opts={})
	self.fetcher = Apollo::Fetcher::SmartFetcher.new

	if(opts[:verbose])
		puts "Initializing fetcher agent..."
	end

	# Declarations
	channel = amqp.create_channel
	channel.prefetch(THREAD_POOL_SIZE)
	
	# Binding (Default)
	self.declarations = Apollo::Agent.declare_entities(channel, opts)
	queue = declarations[:queues]["fetcher.queue"]

	# AMQP contexts for threads
	contexts = []
	(0...THREAD_POOL_SIZE).each do |i|
		puts "FetcherAgent::initialize() - Creating context #{i}" if opts[:verbose]
	end

	# AMQP contexts mutex/lock
	self.mutex = Mutex.new()

	exchange = self.declarations[:exchanges]["fetcher"]

	queue.bind(exchange).subscribe(:ack => true) do |delivery_info, , payload|		
		# There can be troubles with concurency, please see https://groups.google.com/forum/?fromgroups=#!topic/ruby-amqp/aO9GPu-jxuE
		queued_url = JSON.parse(payload)
		url = queued_url["url"]

		puts "FetcherAgent: Received - '#{url}', metadata #{.inspect}" if opts[:verbose]
		self.mutex.synchronize {
			puts "FetcherAgent: Acking - '#{delivery_info.delivery_tag}'" if opts[:verbose]
			channel.basic_ack(delivery_info.delivery_tag, true)
		}

		begin
			doc = Apollo::Fetcher::SmartFetcher::fetch(url)
			doc = process_fetched_doc(queued_url, doc, , opts)
			
			if( && [:reply_to])
				puts "Replying to '#{[:reply_to]}'"
				send_response_msg([:reply_to], queued_url, doc)
			end

		rescue Exception => e
			puts "EXCEPTION: FetcherAgent::initialize() - Unable to fetch '#{url}', reason: '#{e.to_s}'"
		end

		doc
	end
end

Instance Attribute Details

#declarationsObject

Returns the value of attribute declarations.



38
39
40
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 38

def declarations
  @declarations
end

#fetcherObject

Returns the value of attribute fetcher.



37
38
39
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 37

def fetcher
  @fetcher
end

#mutexObject

Returns the value of attribute mutex.



39
40
41
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 39

def mutex
  @mutex
end

Instance Method Details

#format_response_msg(queued_url, doc) ⇒ Object



108
109
110
111
112
113
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 108

def format_response_msg(queued_url, doc)
	return {
		:request => queued_url,
		:response => doc
	}
end

#process_fetched_doc(queued_url, doc, metadata, opts = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 95

def process_fetched_doc(queued_url, doc, , opts={})
	url = queued_url["url"]

	res = Apollo::Model::RawDocument.new
	res.headers = doc[:headers]
	res.body = doc[:body]
	res.sha_hash = Digest::SHA1.hexdigest(doc[:body])
	res.status = doc[:status]
	res.url = url

	return res
end

#send_response_msg(dest, queued_url, doc) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 115

def send_response_msg(dest, queued_url, doc)
	if(dest != nil)							
		msg = format_response_msg(queued_url, doc)

		self.mutex.synchronize {
			exchange = self.declarations[:exchanges][dest]
			exchange.publish(msg.to_json)
		}
	end
end