Class: JobQueue::BeanstalkAdapter
- Inherits:
-
Object
- Object
- JobQueue::BeanstalkAdapter
show all
- Defined in:
- lib/job_queue/adapters/beanstalk_adapter.rb
Defined Under Namespace
Classes: BeanstalkPoolFix
Instance Method Summary
collapse
Constructor Details
Returns a new instance of BeanstalkAdapter.
5
6
7
|
# File 'lib/job_queue/adapters/beanstalk_adapter.rb', line 5
def initialize(options = {})
@hosts = options[:hosts] || 'localhost:11300'
end
|
Instance Method Details
#beanstalk_pool(queue = 'default') ⇒ Object
75
76
77
78
79
80
|
# File 'lib/job_queue/adapters/beanstalk_adapter.rb', line 75
def beanstalk_pool(queue='default')
@beanstalk_pools ||= {}
@beanstalk_pools[queue] ||= begin
BeanstalkPoolFix.new([@hosts].flatten, queue)
end
end
|
#job_stats(job_id) ⇒ Object
62
63
64
65
66
67
|
# File 'lib/job_queue/adapters/beanstalk_adapter.rb', line 62
def job_stats(job_id)
host, id = job_id.split('_')
beanstalk_pool.job_stats(id).select { |k, v| k == host }[0][1]
rescue Beanstalk::NotFoundError
nil
end
|
#put(string, queue, priority, ttr) ⇒ Object
9
10
11
12
13
14
15
16
17
18
19
|
# File 'lib/job_queue/adapters/beanstalk_adapter.rb', line 9
def put(string, queue, priority, ttr)
ttr = ttr.floor raise JobQueue::ArgumentError, "TTR must be greater than 1" if ttr < 2
delay = 0
job_info = beanstalk_pool(queue).put_and_report_conn \
string, priority, delay, ttr
"#{job_info[:host]}_#{job_info[:id]}"
rescue Beanstalk::NotConnected
raise JobQueue::NoConnectionAvailable
end
|
#queue_length(queue) ⇒ Object
69
70
71
72
73
|
# File 'lib/job_queue/adapters/beanstalk_adapter.rb', line 69
def queue_length(queue)
beanstalk_pool.stats_tube(queue)["current-jobs-ready"]
rescue Beanstalk::NotFoundError
0
end
|
#subscribe(error_report, cleanup_task, queue, &block) ⇒ Object
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/job_queue/adapters/beanstalk_adapter.rb', line 21
def subscribe(error_report, cleanup_task, queue, &block)
pool = BeanstalkPoolFix.new([@hosts].flatten, queue)
loop do
begin
job = pool.reserve(1)
time_left = job.stats["time-left"]
JobQueue.logger.debug "Beanstalk received #{job.body}"
Timeout::timeout([time_left - 1, 1].max) do
yield job.body
end
job.delete
rescue Timeout::Error
cleanup_task.call(job.body)
JobQueue.logger.warn "Job timed out"
begin
job.delete
rescue Beanstalk::NotFoundError
JobQueue.logger.error "Job timed out and could not be deleted"
end
rescue Beanstalk::TimedOut
rescue Beanstalk::NotConnected
JobQueue.logger.fatal "Could not connect any beanstalk hosts. " \
"Retrying in 1s."
sleep 1
rescue => e
if job
error_report.call(job.body, e)
begin
job.delete
rescue Beanstalk::NotFoundError
JobQueue.logger.error "Job failed but could not be deleted"
end
else
JobQueue.logger.error "Unhandled exception: #{e.message}\n" \
"#{e.backtrace.join("\n")}\n"
end
end
end
end
|