Class: Jobster::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/jobster/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(queues = nil) ⇒ Worker

Returns a new instance of Worker.



4
5
6
7
8
9
10
# File 'lib/jobster/worker.rb', line 4

def initialize(queues = nil)
  @initial_queues = queues || self.class.queues || [:main]
  @active_queues = {}
  @running_jobs = []
  @process_name = $0
  set_process_name
end

Instance Method Details

#perform_job(class_name, params = {}, id = nil) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/jobster/worker.rb', line 57

def perform_job(class_name, params = {}, id = nil)
  id ||= SecureRandom.uuid[0,8]
  start_time = Time.now
  exception = nil
  logger.info "[#{id}] Started processing \e[34m#{class_name}\e[0m job"
  begin
    klass = Object.const_get(class_name).new(id, params)
    run_callbacks :before_job, klass
    klass.perform
  rescue Job::Abort => e
    exception = e
    logger.info "[#{id}] Job aborted (#{e.message})"
  rescue => e
    exception = e
    logger.warn "[#{id}] \e[31m#{e.class}: #{e.message}\e[0m"
    e.backtrace.each do |line|
      logger.warn "[#{id}]    " + line
    end
    Jobster.config.worker_error_handlers.each { |handler| handler.call(e, klass) }
  ensure
    run_callbacks :after_job, klass, exception
    logger.info "[#{id}] Finished processing \e[34m#{class_name}\e[0m job in #{Time.now - start_time}s"
  end
end

#set_process_nameObject



12
13
14
15
16
17
18
19
20
# File 'lib/jobster/worker.rb', line 12

def set_process_name
  prefix = @process_name.to_s
  prefix += " [exiting]" if @exit
  if @running_jobs.empty?
    $0 = "#{prefix} (idle)"
  else
    $0 = "#{prefix} (running #{@running_jobs.join(', ')})"
  end
end

#workObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/jobster/worker.rb', line 22

def work
  logger.info "Jobster worker started (#{Jobster.config.worker_threads} thread(s))"
  run_callbacks :after_start

  Jobster.delay_queue # Declare it

  Signal.trap("INT")  { @exit = true; set_process_name }
  Signal.trap("TERM") { @exit = true; set_process_name }

  Jobster.channel.prefetch(Jobster.config.worker_threads)
  @initial_queues.uniq.each { |queue | join_queue(queue) }

  exit_checks = 0
  loop do
    if @exit && @running_jobs.empty?
      logger.info "Exiting immediately because no jobs running"
      run_callbacks :before_quit, :immediate
      exit 0
    elsif @exit
      if exit_checks >= 300
        logger.info "Job did not finish in a timely manner. Exiting"
        run_callbacks :before_quit, :timeout
        exit 0
      end
      if exit_checks == 0
        logger.info "Exit requested but job is running. Waiting for job to finish."
      end
      sleep 5
      exit_checks += 1
    else
      sleep 1
    end
  end
end