Class: Droid

Inherits:
Object
  • Object
show all
Includes:
BackwardsCompatibleMethods, EMTimerUtils, QueueMethods
Defined in:
lib/droid.rb,
lib/droid/em.rb,
lib/droid/sync.rb,
lib/droid/queue.rb,
lib/droid/utils.rb,
lib/droid/heroku.rb,
lib/droid/publish.rb,
lib/droid/request.rb,
lib/droid/json_server.rb,
lib/droid/utilization.rb

Direct Known Subclasses

HerokuDroid

Defined Under Namespace

Modules: BackwardsCompatibleMethods, EMTimerUtils, QueueMethods, Utilization, Utils Classes: BackwardsCompatibleQueue, BadPayload, BaseQueue, ExpiredMessage, JSONServer, ListenQueue, ReplyQueue, Request, SyncException, UnknownReplyTo, WorkerQueue

Constant Summary collapse

DEFAULT_TTL =
300

Class Method Summary collapse

Instance Method Summary collapse

Methods included from EMTimerUtils

included, #periodic_timer, #timer

Methods included from BackwardsCompatibleMethods

#listen4

Methods included from QueueMethods

#listener, #worker

Constructor Details

#initialize(name, opts = {}) ⇒ Droid

Returns a new instance of Droid.



120
121
122
123
124
125
126
127
# File 'lib/droid.rb', line 120

def initialize(name, opts={})
	log.info "=== #{name} droid initializing"

	self.class.name = name
	self.class.start do
		yield self if block_given?
	end
end

Class Method Details

.bunnyObject



11
12
13
14
15
16
17
# File 'lib/droid/sync.rb', line 11

def self.bunny
	@@bunny ||= begin
		b = Bunny.new(default_config)
		b.start
		b
	end
end

.call(queue_name, data, opts = {}, popts = {}) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/droid/sync.rb', line 53

def self.call(queue_name, data, opts={}, popts={})
	opts[:reply_to] ||= Droid::Utils.generate_reply_to(queue_name)
	q = nil
	begin
		reconnect_on_error do
			q = bunny.queue(opts[:reply_to], :auto_delete => true)
			publish_to_ex(queue_name, data, opts, popts)
			pop(q)
		end
	ensure
		if q
			q.delete rescue nil
		end
	end
end

.closing?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/droid.rb', line 91

def self.closing?
	::AMQP.closing?
end

.default_configObject



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/droid.rb', line 54

def self.default_config
	uri = URI.parse(ENV["AMQP_URL"] || 'amqp://guest:guest@localhost:5672/')
	{
		:vhost => uri.path,
		:host => uri.host,
		:user => uri.user,
		:port => uri.port || 5672,
		:pass => uri.password
	}
rescue Object => e
	raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}"
end

.gen_queue(droid, name) ⇒ Object

Need this to be backwards compatible



106
107
108
109
110
111
112
# File 'lib/droid/utils.rb', line 106

def self.gen_queue(droid, name)
	dn = droid
	dn = dn.name if dn.respond_to?(:name)
	dn ||= "d"
	dn = dn.gsub(" ", "")
	Droid::Utils.generate_queue(name, droid)
end

.handle_error(err) ⇒ Object



95
96
97
# File 'lib/droid.rb', line 95

def self.handle_error(err)
	log.error "#{err.class}: #{err.message}", :exception => err
end

.logObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/droid.rb', line 35

def self.log
	@@log ||= begin
		require 'logger'
		Logger.class_eval <<EORUBY
			alias_method :notice, :info

			alias_method :error_og, :error
			def error(err, opts={})
				e = opts[:exception]
				if e.respond_to?(:backtrace)
					err += "\n" + e.backtrace.join("\n  ")
				end
				error_og(err)
			end
EORUBY
		Logger.new($stderr)
	end
end

.log=(log) ⇒ Object



31
32
33
# File 'lib/droid.rb', line 31

def self.log=(log)
	@@log = log
end

.nameObject



23
24
25
# File 'lib/droid.rb', line 23

def self.name
	@@name
end

.name=(name) ⇒ Object



27
28
29
# File 'lib/droid.rb', line 27

def self.name=(name)
	@@name = name
end

.pop(q) ⇒ Object



42
43
44
45
46
47
48
49
50
51
# File 'lib/droid/sync.rb', line 42

def self.pop(q)
	begin
		loop do
			result = q.pop
			result = result[:payload] if result.is_a?(Hash)
			return JSON.parse(result) unless result == :queue_empty
			sleep 0.1
		end
	end
end

.publish(ex_name, data, opts = {}, popts = {}) ⇒ Object

default is publish to exchange



28
29
30
# File 'lib/droid/publish.rb', line 28

def self.publish(ex_name, data, opts={}, popts={})
	publish_to_ex(ex_name, data, opts, popts)
end

.publish_to_ex(ex_name, data, opts = {}, popts = {}) ⇒ Object

publish to exchange directly



20
21
22
23
24
25
26
27
# File 'lib/droid/publish.rb', line 20

def self.publish_to_ex(ex_name, data, opts={}, popts={})
	reconnect_on_error do
		ex = bunny.exchange(ex_name)
		json, popts = Droid::Utils.format_publish(data, opts, popts)
		ex.publish(json, popts)
	end
	log.info "amqp_publish exchange=#{ex_name} #{Droid::Utils.format_data_summary(data, popts[:headers])}" unless opts[:log] == false
end

.publish_to_q(queue_name, data, opts = {}, popts = {}) ⇒ Object

publish to queue directly



70
71
72
73
74
75
76
77
# File 'lib/droid/sync.rb', line 70

def self.publish_to_q(queue_name, data, opts={}, popts={})
	reconnect_on_error do
		q = bunny.queue(queue_name)
		json, popts = Droid::Utils.format_publish(data, opts, popts)
		q.publish(json, popts)
	end
	log.info "amqp_publish queue=#{queue_name} #{Droid::Utils.format_data_summary(data, popts[:headers])}" unless opts[:log] == false
end

.reconnect_on_errorObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/droid/sync.rb', line 23

def self.reconnect_on_error
	SystemTimer::timeout(20) do
		begin
			yield if block_given?
		rescue Bunny::ProtocolError
			sleep 0.5
			retry
		rescue Bunny::ConnectionError
			sleep 0.5
			reset_bunny
			retry
		rescue Bunny::ServerDownError
			sleep 0.5
			reset_bunny
			retry
		end
	end
end

.reply_to_q(queue_name, data, opts = {}, popts = {}) ⇒ Object



12
13
14
15
16
17
# File 'lib/droid/publish.rb', line 12

def self.reply_to_q(queue_name, data, opts={}, popts={})
	q = ::MQ.queue(queue_name, :auto_delete => true)
	json, popts = Droid::Utils.format_publish(data, opts, popts)
	q.publish(json, popts)
	log.info "amqp_reply queue=#{queue_name} #{Droid::Utils.format_data_summary(data, popts[:headers])}" unless opts[:log] == false
end

.reset_bunnyObject



7
8
9
# File 'lib/droid/sync.rb', line 7

def self.reset_bunny
	@@bunny = nil
end

.start(name, opts = {}) ⇒ Object

Raises:



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/droid.rb', line 67

def self.start(opts={})
	config = opts[:config] || self.default_config

	wait_for_tcp_port(config[:host], config[:port])

	begin
		::Signal.trap('INT') { ::AMQP.stop{ ::EM.stop } }
		::Signal.trap('TERM'){ ::AMQP.stop{ ::EM.stop } }

		::AMQP.start(config) do
			yield if block_given?
		end
	rescue ::AMQP::Error => e
		log.debug "Caught #{e.class}, sleeping to avoid inittab thrashing"
		sleep 5
		log.debug "Done."
		raise
	end
end

.stop_safeObject



87
88
89
# File 'lib/droid.rb', line 87

def self.stop_safe
	::EM.add_timer(0.2) { ::AMQP.stop { ::EM.stop } }
end

.versionObject



19
20
21
# File 'lib/droid.rb', line 19

def self.version
	@@version ||= File.read(File.dirname(__FILE__) + '/../VERSION').strip
end

.wait_for_tcp_port(host, port, opts = {}) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/droid.rb', line 99

def self.wait_for_tcp_port(host, port, opts={})
	require 'socket'

	opts[:retries] ||= 6
	opts[:timeout] ||= 5

	opts[:retries].times do
		begin
			timeout(opts[:timeout]) do
				TCPSocket.new(host.to_s, port).close
			end
			return
		rescue Object => e
			log.info "#{host}:#{port} not available, waiting... #{e.class}: #{e.message}"
			sleep 1
		end
	end

	raise "#{host}:#{port} did not come up after #{opts[:retries]} retries"
end

Instance Method Details

#logObject



133
134
135
# File 'lib/droid.rb', line 133

def log
	self.class.log
end

#publish(*args) ⇒ Object



129
130
131
# File 'lib/droid.rb', line 129

def publish(*args)
	Droid.publish(*args)
end