Class: Jober::Manager

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/jober/manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#logger, #logger=, #logger_tag

Constructor Details

#initialize(name, allowed_classes = nil) ⇒ Manager

Returns a new instance of Manager.



8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/jober/manager.rb', line 8

def initialize(name, allowed_classes = nil)
  @name = name
  @stopped = false
  @mutex = Mutex.new
  @pids = []
  @allowed_classes = allowed_classes ? (Jober.classes & allowed_classes) : Jober.classes

  $0 = "#{@name} manager"
  if @logger_path
    self.logger = ::Logger.new(File.join(@logger_path, "manager.log"))
  end
  info "starting manager #{@name}"
end

Instance Attribute Details

#logger_pathObject

Returns the value of attribute logger_path.



6
7
8
# File 'lib/jober/manager.rb', line 6

def logger_path
  @logger_path
end

#stoppedObject

Returns the value of attribute stopped.



6
7
8
# File 'lib/jober/manager.rb', line 6

def stopped
  @stopped
end

Instance Method Details

#catchObject



67
68
69
70
71
72
73
# File 'lib/jober/manager.rb', line 67

def catch
  yield
  true
rescue Object => ex
  Jober.exception(ex)
  nil
end

#pidsObject



117
118
119
# File 'lib/jober/manager.rb', line 117

def pids
  @mutex.synchronize { @pids }
end

#runObject



30
31
32
33
34
35
36
37
38
39
# File 'lib/jober/manager.rb', line 30

def run
  run!

  trap("TERM") { stop }

  loop do
    sleep 1
    break if @stopped
  end
end

#run!Object



22
23
24
25
26
27
28
# File 'lib/jober/manager.rb', line 22

def run!
  @allowed_classes.each do |klass|
    klass.get_workers.times do |idx|
      Thread.new { start_worker(klass, klass.get_interval, idx, klass.get_workers) }
    end
  end
end

#run_task_fork(klass, idx, count) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/jober/manager.rb', line 93

def run_task_fork(klass, idx, count)
  info "invoke #{klass}"
  fork do
    $0 = "#{@name} manager #{klass}"
    #$0 += " #{index}" if index > 0
    Jober.call_after_fork
    Jober.reset_redis

    inst = klass.new(:worker_id => idx, :workers_count => count) # class_name parent of Jober::Task

    if @logger_path
      logger_path = File.join(@logger_path, "#{klass.short_name}.log")

      STDOUT.reopen(File.open(logger_path, 'a'))
      STDERR.reopen(File.open(logger_path, 'a'))
      inst.logger = ::Logger.new(logger_path)
    end

    catch do
      inst.execute
    end
  end
end

#start_worker(klass, interval, idx, count) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/jober/manager.rb', line 75

def start_worker(klass, interval, idx, count)
  debug { "start worker for #{klass.to_s}" }
  loop do
    pid = nil
    res = catch do
      pid = run_task_fork(klass, idx, count)
      add_pid(pid)
      Process.wait(pid)
      del_pid(pid)
      sleep interval.to_f unless stopped
    end
    del_pid(pid)
    break if stopped
    sleep 0.5 unless res
    break if stopped
  end
end

#stop(timeout = 2.5) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/jober/manager.rb', line 47

def stop(timeout = 2.5)
  stop!
  return if @pids.empty?

  sum = 0
  while true
    sleep(0.1)
    sum += 0.1
    break if sum >= timeout
    break if @pids.empty?
  end

  return if @pids.empty?

  info { "still alive pids: #{@pids}, killing" }
  @pids.each { |pid| ::Process.kill("KILL", pid) }

  @pids = []
end

#stop!Object



41
42
43
44
45
# File 'lib/jober/manager.rb', line 41

def stop!
  @stopped = true
  @pids.each { |pid| ::Process.kill("QUIT", pid) }
  info "stopping manager..."
end