Class: RQ::JobRunner

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/rq/jobrunner.rb

Overview

the JobRunner class is responsible for pre-forking a process/shell in which to run a job. this class is utilized by the JobRunnerDaemon so processes can be forked via a drb proxy to avoid actual forking during an sqlite transaction - which has undefined behaviour

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(q, job) ⇒ JobRunner

Returns a new instance of JobRunner.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rq/jobrunner.rb', line 33

def initialize q, job
#--{{{
  @q = q
  @job = job
  @jid = job['jid']
  @command = job['command']
  @shell = job['shell'] || 'bash'
  @sh_like = File::basename(@shell) == 'bash' || File::basename(@shell) == 'sh' 
  @r,@w = IO::pipe

  @env = {}
  @env["PATH"] = [@q.bin, ENV["PATH"]].join(":")
  @job.fields.each do |field|
    key = "RQ_#{ field }".upcase.gsub(%r/\s+/,'_')
    val = @job[field]
    val = File.expand_path(File.join(@q.path,val)) if %w( stdin stdout stderr data).include?(field.to_s)
    @env[key] = "#{ val }"
  end
  @env['RQ'] = File.expand_path @q.path
  @env['RQ_JOB'] = @job.to_hash.to_yaml 

  @stdin = @job['stdin']
  @stdout = @job['stdout']
  @stderr = @job['stderr']
  @data = @job['data']

  @stdin &&= File::join @q.path, @stdin # assume path relative to queue 
  @stdout &&= File::join @q.path, @stdout # assume path relative to queue 
  @stderr &&= File::join @q.path, @stderr # assume path relative to queue
  @data &&= File::join @q.path, @data # assume path relative to queue 

  @cid = 
    Util::fork do
      @env.each{|k,v| ENV[k] = v}
      ENV['RQ_PID'] = "#{ $$ }"
      @w.close
      STDIN.reopen @r
      argv =
        if @sh_like 
          [ [@shell, "__rq_job__#{ @jid }__#{ File::basename(@shell) }__"], '--login' ]
        else
          [ [@shell, "__rq_job__#{ @jid }__#{ File::basename(@shell) }__"], '-l' ]
        end
      exec *argv
    end
  @r.close
#--}}}
end

Instance Attribute Details

#cidObject (readonly) Also known as: pid

Returns the value of attribute cid.



25
26
27
# File 'lib/rq/jobrunner.rb', line 25

def cid
  @cid
end

#commandObject (readonly)

Returns the value of attribute command.



27
28
29
# File 'lib/rq/jobrunner.rb', line 27

def command
  @command
end

#dataObject (readonly)

Returns the value of attribute data.



31
32
33
# File 'lib/rq/jobrunner.rb', line 31

def data
  @data
end

#jidObject (readonly)

Returns the value of attribute jid.



24
25
26
# File 'lib/rq/jobrunner.rb', line 24

def jid
  @jid
end

#jobObject (readonly)

Returns the value of attribute job.



23
24
25
# File 'lib/rq/jobrunner.rb', line 23

def job
  @job
end

#qObject (readonly)

Returns the value of attribute q.



22
23
24
# File 'lib/rq/jobrunner.rb', line 22

def q
  @q
end

#shellObject (readonly)

Returns the value of attribute shell.



26
27
28
# File 'lib/rq/jobrunner.rb', line 26

def shell
  @shell
end

#stderrObject (readonly)

Returns the value of attribute stderr.



30
31
32
# File 'lib/rq/jobrunner.rb', line 30

def stderr
  @stderr
end

#stdinObject (readonly)

Returns the value of attribute stdin.



28
29
30
# File 'lib/rq/jobrunner.rb', line 28

def stdin
  @stdin
end

#stdoutObject (readonly)

Returns the value of attribute stdout.



29
30
31
# File 'lib/rq/jobrunner.rb', line 29

def stdout
  @stdout
end

Instance Method Details

#runObject

–}}}



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/rq/jobrunner.rb', line 81

def run
#--{{{
  command = @command.gsub %r/#.*/o, '' # kill comments
  path = @q.bin

  command =
    if @sh_like 
      sin = "0<#{ @stdin }" if @stdin and File.exist?(@stdin)
      sout = "1>#{ @stdout }" if @stdout
      serr = "2>#{ @stderr }" if @stderr
      "( PATH=#{ path }:$PATH #{ command } ;) #{ sin } #{ sout } #{ serr }"
    else
      sin = "<#{ @stdin }" if @stdin
      sout = ">#{ @stdout }" if @stdout
      serr = ">&#{ @stderr }" if @stderr
      "( ( #{ command } ;) #{ sin } #{ sout } ) #{ serr }"
    end

  FileUtils::touch(@stdin) unless File.exist?(@stdin)

  @w.puts command
  @w.close
#--}}}
end