Class: Jobs::Worker

Inherits:
Object
  • Object
show all
Includes:
Runnable
Defined in:
lib/jobs/worker.rb

Instance Method Summary collapse

Methods included from Runnable

#sql_runnable_conditions

Constructor Details

#initialize(config, runpath, config_path, logger, env) ⇒ Worker

Returns a new instance of Worker.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/jobs/worker.rb', line 34

def initialize(config, runpath, config_path, logger, env)
  @config = config
  @runpath = runpath
  @config_path = config_path
  @logger = logger
  @queue = Queue.new
  @lock = Mutex.new
  @sig_lock = Mutex.new
  @thread = nil
  @env = env
  preload

  unless defined?(Jobs::Initializer) and Jobs::Initializer.ready?
    Jobs::Initializer.run! File.join(@runpath,@config['jobpath']), @config_path, @env
  end

  establish_connection
  @threads = (@config['threads'] or 1).to_i
  @pid = Process.pid
end

Instance Method Details

#listenObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/jobs/worker.rb', line 55

def listen
  @alive = true
  @thread = Thread.new{ worker }
  @count = 0
  trap("USR1") { @sig_lock.synchronize {@count += 1} } # queue up some work 
  trap("USR2") { @alive = false; trap("USR2",'SIG_DFL') } # sent to stop normally
  trap('TERM') { @alive = false; trap("TERM",'SIG_DFL') }
  trap('INT')  { @alive = false; trap("INT",'SIG_DFL') }

  while @alive do
    sleep 1 # 1 second resolution
    count = 0
    @sig_lock.synchronize{ count = @count; @count = 0 }
    if count > 0
      @logger.debug("[job worker #{@pid}]: processing #{count} jobs")
      if count > @threads # we've queued up more then we can handle chunk it out and start chugging away
        (count/@threads).times do
          a = count - (count-@threads)
          #@logger.debug("[job worker #{@pid}]: queueing #{a}")
          @queue << a
          count = (count-@threads)
        end
        if count > 0
          #@logger.debug("[job worker #{@pid}]: queueing #{count}")
          @queue << count
        end
      else
        #@logger.debug("[job worker #{@pid}]: queueing #{count}")
        @queue << count
      end
    end
  end
  @queue << nil
  @logger.info "[job worker #{@pid}]: Joining with main thread"
  @thread.join # wait for the worker
end