Class: PerfectQueue::MonitorThread

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(engine, conf) ⇒ MonitorThread

Returns a new instance of MonitorThread.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/perfectqueue/worker.rb', line 6

def initialize(engine, conf)
  @engine = engine
  @log = @engine.log
  @backend = engine.backend
  @finished = false

  @timeout = conf[:timeout] || 600
  @heartbeat_interval = conf[:heartbeat_interval] || @timeout*3/4
  @kill_timeout = conf[:kill_timeout] || @timeout*10
  @kill_interval = conf[:kill_interval] || 60
  @retry_wait = conf[:retry_wait] || nil
  @delete_wait = conf[:delete_wait] || 3600

  @token = nil
  @heartbeat_time = nil
  @kill_time = nil
  @kill_proc = nil
  @canceled = false
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#kill!Object



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/perfectqueue/worker.rb', line 85

def kill!
  if @kill_proc
    @log.info "killing id=#{@task_id}..."
    begin
      @kill_proc.call
    rescue
      @log.info "kill failed id=#{@task_id}: #{$!.class}: #{$!}"
      $!.backtrace.each {|bt|
        @log.debug "  #{bt}"
      }
    end
  end
end

#processObject



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/perfectqueue/worker.rb', line 47

def process
  while true
    sleep 1
    @mutex.synchronize {
      return if @finished
      return unless @token
      now = Time.now.to_i
      try_extend(now)
      try_kill(now)
    }
  end
end

#reset(success) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/perfectqueue/worker.rb', line 127

def reset(success)
  @mutex.synchronize {
    if success
      @backend.finish(@token, @delete_wait)
    elsif @retry_wait && !@canceled
      begin
        @backend.update(@token, Time.now.to_i+@retry_wait)
      rescue
        # ignore CanceledError
      end
    end
    @token = nil
  }
end

#runObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/perfectqueue/worker.rb', line 32

def run
  until @finished
    @mutex.synchronize {
      while true
        return if @finished
        break if @token
        @cond.wait(@mutex)
      end
    }
    process
  end
rescue
  @engine.stop($!)
end

#set(token, task_id) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/perfectqueue/worker.rb', line 110

def set(token, task_id)
  @mutex.synchronize {
    now = Time.now.to_i
    @token = token
    @task_id = task_id
    @heartbeat_time = now + @heartbeat_interval
    @kill_time = now + @kill_timeout
    @kill_proc = nil
    @canceled = false
    @cond.broadcast
  }
end

#set_kill_proc(kill_proc) ⇒ Object



123
124
125
# File 'lib/perfectqueue/worker.rb', line 123

def set_kill_proc(kill_proc)
  @kill_proc = kill_proc
end

#shutdownObject



106
107
108
# File 'lib/perfectqueue/worker.rb', line 106

def shutdown
  @thread.join
end

#startObject



28
29
30
# File 'lib/perfectqueue/worker.rb', line 28

def start
  @thread = Thread.new(&method(:run))
end

#stopObject



99
100
101
102
103
104
# File 'lib/perfectqueue/worker.rb', line 99

def stop
  @mutex.synchronize {
    @finished = true
    @cond.broadcast
  }
end

#try_extend(now) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/perfectqueue/worker.rb', line 60

def try_extend(now)
  if now >= @heartbeat_time && !@canceled
    @log.debug "extending timeout=#{now+@timeout} id=#{@task_id}"
    begin
      @backend.update(@token, now+@timeout)
    rescue CanceledError
      @log.info "task id=#{@task_id} is canceled."
      @canceled = true
      @kill_time = now
    rescue
      @log.error "unexpected error id=#{@task_id}: #{$!}"
      @canceled = true
      @kill_time = now
    end
    @heartbeat_time = now + @heartbeat_interval
  end
end

#try_kill(now) ⇒ Object



78
79
80
81
82
83
# File 'lib/perfectqueue/worker.rb', line 78

def try_kill(now)
  if now >= @kill_time
    kill!
    @kill_time = now + @kill_interval
  end
end