Class: PerfectQueue::MonitorThread

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(engine, conf) ⇒ MonitorThread

Returns a new instance of MonitorThread.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/perfectqueue/worker.rb', line 6

def initialize(engine, conf)
  @engine = engine
  @log = @engine.log
  @backend = engine.backend

  @timeout = conf[:timeout] || 600
  @heartbeat_interval = conf[:heartbeat_interval] || @timeout*3/4
  @kill_timeout = conf[:kill_timeout] || @timeout*10
  @kill_interval = conf[:kill_interval] || 60
  @retry_wait = conf[:retry_wait] || nil

  @token = nil
  @heartbeat_time = nil
  @kill_time = nil
  @kill_proc = nil
  @canceled = false
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#kill!Object



76
77
78
79
80
81
# File 'lib/perfectqueue/worker.rb', line 76

def kill!
  if @kill_proc
    @log.info "killing #{@task.id}..."
    @kill_proc.call rescue nil
  end
end

#reset(success) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/perfectqueue/worker.rb', line 109

def reset(success)
  @mutex.synchronize {
    if success
      @backend.finish(@token)
    elsif @retry_wait && !@canceled
      begin
        @backend.update(@token, Time.now.to_i+@retry_wait)
      rescue
        # ignore CanceledError
      end
    end
    @token = nil
  }
end

#runObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/perfectqueue/worker.rb', line 31

def run
  until @engine.finished?
    @mutex.synchronize {
      while true
        return if @engine.finished?
        break if @token
        @cond.wait(@mutex)
      end
    }
    while true
      sleep 1
      @mutex.synchronize {
        return if @engine.finished?
        break unless @token
        now = Time.now.to_i
        try_extend(now)
        try_kill(now)
      }
    end
  end
rescue
  @engine.stop($!)
end

#set(token) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/perfectqueue/worker.rb', line 93

def set(token)
  @mutex.synchronize {
    now = Time.now.to_i
    @token = token
    @heartbeat_time = now + @heartbeat_interval
    @kill_time = now + @kill_timeout
    @kill_proc = nil
    @canceled = false
    @cond.broadcast
  }
end

#set_kill_proc(kill_proc) ⇒ Object



105
106
107
# File 'lib/perfectqueue/worker.rb', line 105

def set_kill_proc(kill_proc)
  @kill_proc = kill_proc
end

#shutdownObject



89
90
91
# File 'lib/perfectqueue/worker.rb', line 89

def shutdown
  @thread.join
end

#startObject



26
27
28
29
# File 'lib/perfectqueue/worker.rb', line 26

def start
  @log.debug "running worker."
  @thread = Thread.new(&method(:run))
end

#stopObject



83
84
85
86
87
# File 'lib/perfectqueue/worker.rb', line 83

def stop
  @mutex.synchronize {
    @cond.broadcast
  }
end

#try_extend(now) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/perfectqueue/worker.rb', line 55

def try_extend(now)
  if now >= @heartbeat_time && !@canceled
    @log.debug "extending timeout=#{now+@timeout} id=#{@task.id}"
    begin
      @backend.update(@token, now+@timeout)
    rescue CanceledError
      @log.info "task id=#{@task.id} is canceled."
      @canceled = true
      @kill_time = now
    end
    @heartbeat_time = now + @heartbeat_interval
  end
end

#try_kill(now) ⇒ Object



69
70
71
72
73
74
# File 'lib/perfectqueue/worker.rb', line 69

def try_kill(now)
  if now >= @kill_time
    kill!
    @kill_time = now + @kill_interval
  end
end