Class: Jober::Manager
- Inherits:
-
Object
show all
- Includes:
- Logger
- Defined in:
- lib/jober/manager.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logger
#logger, #logger=, #logger_tag
Constructor Details
#initialize(name, allowed_classes = nil) ⇒ Manager
Returns a new instance of Manager.
8
9
10
11
12
13
14
15
16
17
18
19
20
|
# File 'lib/jober/manager.rb', line 8
def initialize(name, allowed_classes = nil)
@name = name
@stopped = false
@mutex = Mutex.new
@pids = []
@allowed_classes = allowed_classes ? (Jober.classes & allowed_classes) : Jober.classes
$0 = "#{@name} manager"
if @logger_path
self.logger = ::Logger.new(File.join(@logger_path, "manager.log"))
end
info "starting manager #{@name}"
end
|
Instance Attribute Details
#logger_path ⇒ Object
Returns the value of attribute logger_path.
6
7
8
|
# File 'lib/jober/manager.rb', line 6
def logger_path
@logger_path
end
|
#stopped ⇒ Object
Returns the value of attribute stopped.
6
7
8
|
# File 'lib/jober/manager.rb', line 6
def stopped
@stopped
end
|
Instance Method Details
#catch ⇒ Object
67
68
69
70
71
72
73
|
# File 'lib/jober/manager.rb', line 67
def catch
yield
true
rescue Object => ex
Jober.exception(ex)
nil
end
|
#pids ⇒ Object
117
118
119
|
# File 'lib/jober/manager.rb', line 117
def pids
@mutex.synchronize { @pids }
end
|
#run ⇒ Object
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/jober/manager.rb', line 30
def run
run!
trap("TERM") { stop }
loop do
sleep 1
break if @stopped
end
end
|
#run! ⇒ Object
22
23
24
25
26
27
28
|
# File 'lib/jober/manager.rb', line 22
def run!
@allowed_classes.each do |klass|
klass.get_workers.times do |idx|
Thread.new { start_worker(klass, klass.get_interval, idx, klass.get_workers) }
end
end
end
|
#run_task_fork(klass, idx, count) ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/jober/manager.rb', line 93
def run_task_fork(klass, idx, count)
info "invoke #{klass}"
fork do
$0 = "#{@name} manager #{klass}"
Jober.call_after_fork
Jober.reset_redis
inst = klass.new(:worker_id => idx, :workers_count => count)
if @logger_path
logger_path = File.join(@logger_path, "#{klass.short_name}.log")
STDOUT.reopen(File.open(logger_path, 'a'))
STDERR.reopen(File.open(logger_path, 'a'))
inst.logger = ::Logger.new(logger_path)
end
catch do
inst.execute
end
end
end
|
#start_worker(klass, interval, idx, count) ⇒ Object
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/jober/manager.rb', line 75
def start_worker(klass, interval, idx, count)
debug { "start worker for #{klass.to_s}" }
loop do
pid = nil
res = catch do
pid = run_task_fork(klass, idx, count)
add_pid(pid)
Process.wait(pid)
del_pid(pid)
sleep interval.to_f unless stopped
end
del_pid(pid)
break if stopped
sleep 0.5 unless res
break if stopped
end
end
|
#stop(timeout = 2.5) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# File 'lib/jober/manager.rb', line 47
def stop(timeout = 2.5)
stop!
return if @pids.empty?
sum = 0
while true
sleep(0.1)
sum += 0.1
break if sum >= timeout
break if @pids.empty?
end
return if @pids.empty?
info { "still alive pids: #{@pids}, killing" }
@pids.each { |pid| ::Process.kill("KILL", pid) }
@pids = []
end
|
#stop! ⇒ Object
41
42
43
44
45
|
# File 'lib/jober/manager.rb', line 41
def stop!
@stopped = true
@pids.each { |pid| ::Process.kill("QUIT", pid) }
info "stopping manager..."
end
|