Class: PerfectQueue::MonitorThread
- Inherits:
-
Object
- Object
- PerfectQueue::MonitorThread
- Defined in:
- lib/perfectqueue/worker.rb
Instance Method Summary collapse
-
#initialize(engine, conf) ⇒ MonitorThread
constructor
A new instance of MonitorThread.
- #kill! ⇒ Object
- #process ⇒ Object
- #reset(success) ⇒ Object
- #run ⇒ Object
- #set(token, task_id) ⇒ Object
- #set_kill_proc(kill_proc) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #try_extend(now) ⇒ Object
- #try_kill(now) ⇒ Object
Constructor Details
#initialize(engine, conf) ⇒ MonitorThread
Returns a new instance of MonitorThread.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/perfectqueue/worker.rb', line 6 def initialize(engine, conf) @engine = engine @log = @engine.log @backend = engine.backend @finished = false @timeout = conf[:timeout] || 600 @heartbeat_interval = conf[:heartbeat_interval] || @timeout*3/4 @kill_timeout = conf[:kill_timeout] || @timeout*10 @kill_interval = conf[:kill_interval] || 60 @retry_wait = conf[:retry_wait] || nil @delete_wait = conf[:delete_wait] || 3600 @token = nil @heartbeat_time = nil @kill_time = nil @kill_proc = nil @canceled = false @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#kill! ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/perfectqueue/worker.rb', line 85 def kill! if @kill_proc @log.info "killing id=#{@task_id}..." begin @kill_proc.call rescue @log.info "kill failed id=#{@task_id}: #{$!.class}: #{$!}" $!.backtrace.each {|bt| @log.debug " #{bt}" } end end end |
#process ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/perfectqueue/worker.rb', line 47 def process while true sleep 1 @mutex.synchronize { return if @finished return unless @token now = Time.now.to_i try_extend(now) try_kill(now) } end end |
#reset(success) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/perfectqueue/worker.rb', line 127 def reset(success) @mutex.synchronize { if success @backend.finish(@token, @delete_wait) elsif @retry_wait && !@canceled begin @backend.update(@token, Time.now.to_i+@retry_wait) rescue # ignore CanceledError end end @token = nil } end |
#run ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/perfectqueue/worker.rb', line 32 def run until @finished @mutex.synchronize { while true return if @finished break if @token @cond.wait(@mutex) end } process end rescue @engine.stop($!) end |
#set(token, task_id) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/perfectqueue/worker.rb', line 110 def set(token, task_id) @mutex.synchronize { now = Time.now.to_i @token = token @task_id = task_id @heartbeat_time = now + @heartbeat_interval @kill_time = now + @kill_timeout @kill_proc = nil @canceled = false @cond.broadcast } end |
#set_kill_proc(kill_proc) ⇒ Object
123 124 125 |
# File 'lib/perfectqueue/worker.rb', line 123 def set_kill_proc(kill_proc) @kill_proc = kill_proc end |
#shutdown ⇒ Object
106 107 108 |
# File 'lib/perfectqueue/worker.rb', line 106 def shutdown @thread.join end |
#start ⇒ Object
28 29 30 |
# File 'lib/perfectqueue/worker.rb', line 28 def start @thread = Thread.new(&method(:run)) end |
#stop ⇒ Object
99 100 101 102 103 104 |
# File 'lib/perfectqueue/worker.rb', line 99 def stop @mutex.synchronize { @finished = true @cond.broadcast } end |
#try_extend(now) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/perfectqueue/worker.rb', line 60 def try_extend(now) if now >= @heartbeat_time && !@canceled @log.debug "extending timeout=#{now+@timeout} id=#{@task_id}" begin @backend.update(@token, now+@timeout) rescue CanceledError @log.info "task id=#{@task_id} is canceled." @canceled = true @kill_time = now rescue @log.error "unexpected error id=#{@task_id}: #{$!}" @canceled = true @kill_time = now end @heartbeat_time = now + @heartbeat_interval end end |
#try_kill(now) ⇒ Object
78 79 80 81 82 83 |
# File 'lib/perfectqueue/worker.rb', line 78 def try_kill(now) if now >= @kill_time kill! @kill_time = now + @kill_interval end end |