Class: Jobs::Server
Instance Method Summary collapse
-
#initialize(runpath, config_path, env) ⇒ Server
constructor
A new instance of Server.
- #kill ⇒ Object
- #migrate ⇒ Object
- #run(daemonize = true) ⇒ Object
Methods included from Runnable
Constructor Details
#initialize(runpath, config_path, env) ⇒ Server
Returns a new instance of Server.
16 17 18 19 20 21 22 23 |
# File 'lib/jobs/server.rb', line 16 def initialize(runpath, config_path, env) @runpath = runpath @config_path = File.(config_path) @env = env @workers = [] @next_worker = 0 load_config end |
Instance Method Details
#kill ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/jobs/server.rb', line 52 def kill load_config if File.exist?(@pid_file) system("kill #{File.read(@pid_file)}") else puts "Job Queue Pid File not found! Try ps -ef | grep rjqueue" end end |
#migrate ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/jobs/server.rb', line 25 def migrate enable_logger if @config['preload'] preload = @config['preload'] if preload.is_a?(Array) preload.each { |f| require f } else require preload end end require 'rubygems' require 'active_record' @logger.info("[job worker #{@pid}]: establish connection environment with #{@config_path.inspect} and env: #{@env.inspect}") @db = YAML.load_file(File.join(File.dirname(@config_path),'database.yml'))[@env] ActiveRecord::Base.establish_connection @db ActiveRecord::Base.logger = @logger #ActiveRecord::Base.logger = Logger.new( "#{@logfile}-db.log" ) #ActiveRecord::Base.logger = Logger.new( "/dev/null" ) # load the jobs/job model require 'jobs/job' require 'jobs/migrate' CreateJobs.up end |
#run(daemonize = true) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/jobs/server.rb', line 61 def run(daemonize=true) @daemonize = daemonize if @daemonize @child_up = false # use this to determine when all workers are started and the child process is ready to listen for events trap("USR1"){ @child_up = true } if File.exist?(@pid_file) STDERR.puts "Pid file for job already exists: #{@pid_file}" exit 1 end # daemonize, create a pipe to send status to the parent process, after the child has successfully started or failed rd, wr = IO.pipe @parent_pid = Process.pid # wait for child process to start running before exiting parent process fork do rd.close Process.setsid fork do begin Process.setsid File.open(@pid_file, 'wb') {|f| f << Process.pid} Dir.chdir('/') File.umask 0000 STDIN.reopen "/dev/null" STDOUT.reopen "/dev/null", "a" STDERR.reopen STDOUT # XXX: change back to the runpath... this means if the runpath is removed # say by a cap deploy... the daemon will likely die. Dir.chdir(@runpath) startup @logger.info "Job process active" wr.write "Listening on udp://#{@host}:#{@port}\n" wr.flush wr.close # signal to our parent we're up and running, this lets the parent exit run_loop rescue => e if wr.closed? @logger.error "#{e.} #{e.backtrace.join("\n")}" else wr.write e. wr.write e.backtrace.join("\n") wr.write "\n" wr.write "ERROR!" wr.flush wr.close end ensure cleanup end end wr.close end wr.close output = rd.read puts output rd.close w = 0 puts "Waiting for child workers to start..." while( !@child_up and w < 20 ) do sleep 5 w+= 1 end if @child_up puts "Workers alive" else puts "Check error log, workers may not be started, or you may be starting so many workers it has taken longer then 10 seconds to start them all. In either event checking the log for details would be the first place to look." @parent_pid = nil end exit(1) if output.match(/ERROR/i) else startup run_loop end end |