35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/perfectqueue/engine.rb', line 35
def run
@workers.each {|w|
w.start
}
until finished?
w = acquire_worker
next unless w
begin
until finished?
now = Time.now.to_i
token, task = @backend.acquire(now+@timeout)
unless token
sleep @poll_interval
next
end
if task.created_at < now-@expire
@log.warn "canceling expired task id=#{task.id}"
@backend.cancel(token)
next
end
@log.info "acquired task id=#{task.id}"
w.submit(token, task)
w = nil
break
end
ensure
release_worker(w) if w
end
end
ensure
@finished = true
end
|