Module: Stalker

Extended by:
Stalker
Included in:
Stalker
Defined in:
lib/stalker.rb

Defined Under Namespace

Classes: BadURL, JobTimeout, NoJobsDefined, NoSuchJob

Instance Method Summary collapse

Instance Method Details

#all_jobsObject



154
155
156
# File 'lib/stalker.rb', line 154

def all_jobs
	@@handlers.keys
end

#beanstalkObject



126
127
128
# File 'lib/stalker.rb', line 126

def beanstalk
	@@beanstalk ||= Beanstalk::Pool.new([ beanstalk_host_and_port ])
end

#beanstalk_host_and_portObject

Raises:



137
138
139
140
141
# File 'lib/stalker.rb', line 137

def beanstalk_host_and_port
	uri = URI.parse(beanstalk_url)
	raise(BadURL, beanstalk_url) if uri.scheme != 'beanstalk'
	return "#{uri.host}:#{uri.port || 11300}"
end

#beanstalk_urlObject



130
131
132
133
# File 'lib/stalker.rb', line 130

def beanstalk_url
	return @@url if defined?(@@url) and @@url
	ENV['BEANSTALK_URL'] || 'beanstalk://localhost/'
end

#clear!Object



162
163
164
165
# File 'lib/stalker.rb', line 162

def clear!
	@@handlers = nil
	@@error_handler = nil
end

#connect(url) ⇒ Object



9
10
11
12
# File 'lib/stalker.rb', line 9

def connect(url)
	@@url = url
	beanstalk
end

#enqueue(job, args = {}, opts = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
# File 'lib/stalker.rb', line 14

def enqueue(job, args={}, opts={})
	pri   = opts[:pri]   || 65536
	delay = opts[:delay] || 0
	ttr   = opts[:ttr]   || 120
	beanstalk.use job
	beanstalk.put [ job, args ].to_json, pri, delay, ttr
rescue Beanstalk::NotConnected => e
	failed_connection(e)
end

#error(&blk) ⇒ Object



29
30
31
# File 'lib/stalker.rb', line 29

def error(&blk)
	@@error_handler = blk
end

#error_handlerObject



158
159
160
# File 'lib/stalker.rb', line 158

def error_handler
	@@error_handler
end

#exception_message(e) ⇒ Object



143
144
145
146
147
148
149
150
151
152
# File 'lib/stalker.rb', line 143

def exception_message(e)
	msg = [ "Exception #{e.class} -> #{e.message}" ]

	base = File.expand_path(Dir.pwd) + '/'
	e.backtrace.each do |t|
		msg << "   #{File.expand_path(t).gsub(/#{base}/, '')}"
	end

	msg.join("\n")
end

#failed_connection(e) ⇒ Object



92
93
94
95
96
97
# File 'lib/stalker.rb', line 92

def failed_connection(e)
	log_error exception_message(e)
	log_error "*** Failed connection to #{beanstalk_url}"
	log_error "*** Check that beanstalkd is running (or set a different BEANSTALK_URL)"
	exit 1
end

#job(j, &block) ⇒ Object



24
25
26
27
# File 'lib/stalker.rb', line 24

def job(j, &block)
	@@handlers ||= {}
	@@handlers[j] = block
end

#log(msg) ⇒ Object



118
119
120
# File 'lib/stalker.rb', line 118

def log(msg)
	puts "[#{Time.now}] #{msg}"
end

#log_error(msg) ⇒ Object



122
123
124
# File 'lib/stalker.rb', line 122

def log_error(msg)
	STDERR.puts msg
end

#log_job_begin(name, args) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/stalker.rb', line 99

def log_job_begin(name, args)
	args_flat = unless args.empty?
		'(' + args.inject([]) do |accum, (key,value)|
			accum << "#{key}=#{value}"
		end.join(' ') + ')'
	else
		''
	end

	log [ "->", name, args_flat ].join(' ')
	@job_begun = Time.now
end

#log_job_end(name, type = "finished") ⇒ Object



112
113
114
115
116
# File 'lib/stalker.rb', line 112

def log_job_end(name, type="finished")
	ellapsed = Time.now - @job_begun
	ms = (ellapsed.to_f * 1000).to_i
	log "-> #{name} #{type} in #{ms}ms"
end

#prep(jobs = nil) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/stalker.rb', line 36

def prep(jobs=nil)
	raise NoJobsDefined unless defined?(@@handlers)
	@@error_handler = nil unless defined?(@@error_handler)

	jobs ||= all_jobs

	jobs.each do |job|
		raise(NoSuchJob, job) unless @@handlers[job]
	end

	log "Working #{jobs.size} jobs: [ #{jobs.join(' ')} ]"

	jobs.each { |job| beanstalk.watch(job) }

	beanstalk.list_tubes_watched.each do |server, tubes|
		tubes.each { |tube| beanstalk.ignore(tube) unless jobs.include?(tube) }
	end
rescue Beanstalk::NotConnected => e
	failed_connection(e)
end

#work(jobs = nil) ⇒ Object



57
58
59
60
# File 'lib/stalker.rb', line 57

def work(jobs=nil)
	prep(jobs)
	loop { work_one_job }
end

#work_one_jobObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/stalker.rb', line 64

def work_one_job
	job = beanstalk.reserve
	name, args = JSON.parse job.body
	log_job_begin(name, args)
	handler = @@handlers[name]
	raise(NoSuchJob, name) unless handler

	begin
		Timeout::timeout(job.ttr - 1) do
			handler.call(args)
		end
	rescue Timeout::Error
		raise JobTimeout, "#{name} hit #{job.ttr-1}s timeout"
	end

	job.delete
	log_job_end(name)
rescue Beanstalk::NotConnected => e
	failed_connection(e)
rescue SystemExit
	raise
rescue => e
	log_error exception_message(e)
	job.bury rescue nil
	log_job_end(name, 'failed')
	error_handler.call(e) if error_handler
end