Class: Jobs::Worker
Instance Method Summary collapse
-
#initialize(config, runpath, config_path, logger, env) ⇒ Worker
constructor
A new instance of Worker.
- #listen ⇒ Object
Methods included from Runnable
Constructor Details
#initialize(config, runpath, config_path, logger, env) ⇒ Worker
Returns a new instance of Worker.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/jobs/worker.rb', line 34 def initialize(config, runpath, config_path, logger, env) @config = config @runpath = runpath @config_path = config_path @logger = logger @queue = Queue.new @lock = Mutex.new @sig_lock = Mutex.new @thread = nil @env = env preload unless defined?(Jobs::Initializer) and Jobs::Initializer.ready? Jobs::Initializer.run! File.join(@runpath,@config['jobpath']), @config_path, @env end establish_connection @threads = (@config['threads'] or 1).to_i @pid = Process.pid end |
Instance Method Details
#listen ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/jobs/worker.rb', line 55 def listen @alive = true @thread = Thread.new{ worker } @count = 0 trap("USR1") { @sig_lock.synchronize {@count += 1} } # queue up some work trap("USR2") { @alive = false; trap("USR2",'SIG_DFL') } # sent to stop normally trap('TERM') { @alive = false; trap("TERM",'SIG_DFL') } trap('INT') { @alive = false; trap("INT",'SIG_DFL') } while @alive do sleep 1 # 1 second resolution count = 0 @sig_lock.synchronize{ count = @count; @count = 0 } if count > 0 @logger.debug("[job worker #{@pid}]: processing #{count} jobs") if count > @threads # we've queued up more then we can handle chunk it out and start chugging away (count/@threads).times do a = count - (count-@threads) #@logger.debug("[job worker #{@pid}]: queueing #{a}") @queue << a count = (count-@threads) end if count > 0 #@logger.debug("[job worker #{@pid}]: queueing #{count}") @queue << count end else #@logger.debug("[job worker #{@pid}]: queueing #{count}") @queue << count end end end @queue << nil @logger.info "[job worker #{@pid}]: Joining with main thread" @thread.join # wait for the worker end |