Module: Stalker
Defined Under Namespace
Classes: BadURL, JobTimeout, NoJobsDefined, NoSuchJob
Instance Method Summary
collapse
Instance Method Details
#all_jobs ⇒ Object
154
155
156
|
# File 'lib/stalker.rb', line 154
def all_jobs
@@handlers.keys
end
|
#beanstalk ⇒ Object
126
127
128
|
# File 'lib/stalker.rb', line 126
def beanstalk
@@beanstalk ||= Beanstalk::Pool.new([ beanstalk_host_and_port ])
end
|
#beanstalk_host_and_port ⇒ Object
137
138
139
140
141
|
# File 'lib/stalker.rb', line 137
def beanstalk_host_and_port
uri = URI.parse(beanstalk_url)
raise(BadURL, beanstalk_url) if uri.scheme != 'beanstalk'
return "#{uri.host}:#{uri.port || 11300}"
end
|
#beanstalk_url ⇒ Object
130
131
132
133
|
# File 'lib/stalker.rb', line 130
def beanstalk_url
return @@url if defined?(@@url) and @@url
ENV['BEANSTALK_URL'] || 'beanstalk://localhost/'
end
|
#clear! ⇒ Object
162
163
164
165
|
# File 'lib/stalker.rb', line 162
def clear!
@@handlers = nil
@@error_handler = nil
end
|
#connect(url) ⇒ Object
9
10
11
12
|
# File 'lib/stalker.rb', line 9
def connect(url)
@@url = url
beanstalk
end
|
#enqueue(job, args = {}, opts = {}) ⇒ Object
14
15
16
17
18
19
20
21
22
|
# File 'lib/stalker.rb', line 14
def enqueue(job, args={}, opts={})
pri = opts[:pri] || 65536
delay = opts[:delay] || 0
ttr = opts[:ttr] || 120
beanstalk.use job
beanstalk.put [ job, args ].to_json, pri, delay, ttr
rescue Beanstalk::NotConnected => e
failed_connection(e)
end
|
#error(&blk) ⇒ Object
29
30
31
|
# File 'lib/stalker.rb', line 29
def error(&blk)
@@error_handler = blk
end
|
#error_handler ⇒ Object
158
159
160
|
# File 'lib/stalker.rb', line 158
def error_handler
@@error_handler
end
|
#exception_message(e) ⇒ Object
143
144
145
146
147
148
149
150
151
152
|
# File 'lib/stalker.rb', line 143
def exception_message(e)
msg = [ "Exception #{e.class} -> #{e.message}" ]
base = File.expand_path(Dir.pwd) + '/'
e.backtrace.each do |t|
msg << " #{File.expand_path(t).gsub(/#{base}/, '')}"
end
msg.join("\n")
end
|
#failed_connection(e) ⇒ Object
92
93
94
95
96
97
|
# File 'lib/stalker.rb', line 92
def failed_connection(e)
log_error exception_message(e)
log_error "*** Failed connection to #{beanstalk_url}"
log_error "*** Check that beanstalkd is running (or set a different BEANSTALK_URL)"
exit 1
end
|
#job(j, &block) ⇒ Object
24
25
26
27
|
# File 'lib/stalker.rb', line 24
def job(j, &block)
@@handlers ||= {}
@@handlers[j] = block
end
|
#log(msg) ⇒ Object
118
119
120
|
# File 'lib/stalker.rb', line 118
def log(msg)
puts "[#{Time.now}] #{msg}"
end
|
#log_error(msg) ⇒ Object
122
123
124
|
# File 'lib/stalker.rb', line 122
def log_error(msg)
STDERR.puts msg
end
|
#log_job_begin(name, args) ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/stalker.rb', line 99
def log_job_begin(name, args)
args_flat = unless args.empty?
'(' + args.inject([]) do |accum, (key,value)|
accum << "#{key}=#{value}"
end.join(' ') + ')'
else
''
end
log [ "->", name, args_flat ].join(' ')
@job_begun = Time.now
end
|
#log_job_end(name, type = "finished") ⇒ Object
112
113
114
115
116
|
# File 'lib/stalker.rb', line 112
def log_job_end(name, type="finished")
ellapsed = Time.now - @job_begun
ms = (ellapsed.to_f * 1000).to_i
log "-> #{name} #{type} in #{ms}ms"
end
|
#prep(jobs = nil) ⇒ Object
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/stalker.rb', line 36
def prep(jobs=nil)
raise NoJobsDefined unless defined?(@@handlers)
@@error_handler = nil unless defined?(@@error_handler)
jobs ||= all_jobs
jobs.each do |job|
raise(NoSuchJob, job) unless @@handlers[job]
end
log "Working #{jobs.size} jobs: [ #{jobs.join(' ')} ]"
jobs.each { |job| beanstalk.watch(job) }
beanstalk.list_tubes_watched.each do |server, tubes|
tubes.each { |tube| beanstalk.ignore(tube) unless jobs.include?(tube) }
end
rescue Beanstalk::NotConnected => e
failed_connection(e)
end
|
#work(jobs = nil) ⇒ Object
57
58
59
60
|
# File 'lib/stalker.rb', line 57
def work(jobs=nil)
prep(jobs)
loop { work_one_job }
end
|
#work_one_job ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/stalker.rb', line 64
def work_one_job
job = beanstalk.reserve
name, args = JSON.parse job.body
log_job_begin(name, args)
handler = @@handlers[name]
raise(NoSuchJob, name) unless handler
begin
Timeout::timeout(job.ttr - 1) do
handler.call(args)
end
rescue Timeout::Error
raise JobTimeout, "#{name} hit #{job.ttr-1}s timeout"
end
job.delete
log_job_end(name)
rescue Beanstalk::NotConnected => e
failed_connection(e)
rescue SystemExit
raise
rescue => e
log_error exception_message(e)
job.bury rescue nil
log_job_end(name, 'failed')
error_handler.call(e) if error_handler
end
|