Class: RQ::JobRunner

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/rq-3.0.0/jobrunner.rb

Overview

the JobRunner class is responsible for pre-forking a process/shell in which to run a job. this class is utilized by the JobRunnerDaemon so processes can be forked via a drb proxy to avoid actual forking during an sqlite transaction - which has undefined behaviour

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(q, job) ⇒ JobRunner

Returns a new instance of JobRunner.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/rq-3.0.0/jobrunner.rb', line 32

def initialize q, job
#--{{{
  @q = q
  @job = job
  @jid = job['jid']
  @command = job['command']
  @shell = job['shell'] || 'bash'
  @sh_like = File::basename(@shell) == 'bash' || File::basename(@shell) == 'sh' 
  @r,@w = IO::pipe

  @env = {}
  @env["PATH"] = [@q.bin, ENV["PATH"]].join(":")
  @job.fields.each do |field|
    key = "RQ_#{ field }".upcase.gsub(%r/\s+/,'_')
    val = @job[field]
    @env[key] = "#{ val }"
  end
  @env['RQ_JOB'] = @job.to_hash.to_yaml 

  @stdin = @job['stdin']
  @stdout = @job['stdout']
  @stderr = @job['stderr']

  @stdin &&= File::join @q.path, @stdin # assume path relative to queue 
  @stdout &&= File::join @q.path, @stdout # assume path relative to queue 
  @stderr &&= File::join @q.path, @stderr # assume path relative to queue

  @cid = 
    Util::fork do
      @env.each{|k,v| ENV[k] = v}
      ENV['RQ_PID'] = "#{ $$ }"
      @w.close
      STDIN.reopen @r
      argv =
        if @sh_like 
          [ [@shell, "__rq_job__#{ @jid }__#{ File::basename(@shell) }__"], '--login' ]
        else
          [ [@shell, "__rq_job__#{ @jid }__#{ File::basename(@shell) }__"], '-l' ]
        end
      exec *argv
    end

  @r.close
#--}}}
end

Instance Attribute Details

#cidObject (readonly) Also known as: pid

Returns the value of attribute cid.



25
26
27
# File 'lib/rq-3.0.0/jobrunner.rb', line 25

def cid
  @cid
end

#commandObject (readonly)

Returns the value of attribute command.



27
28
29
# File 'lib/rq-3.0.0/jobrunner.rb', line 27

def command
  @command
end

#jidObject (readonly)

Returns the value of attribute jid.



24
25
26
# File 'lib/rq-3.0.0/jobrunner.rb', line 24

def jid
  @jid
end

#jobObject (readonly)

Returns the value of attribute job.



23
24
25
# File 'lib/rq-3.0.0/jobrunner.rb', line 23

def job
  @job
end

#qObject (readonly)

Returns the value of attribute q.



22
23
24
# File 'lib/rq-3.0.0/jobrunner.rb', line 22

def q
  @q
end

#shellObject (readonly)

Returns the value of attribute shell.



26
27
28
# File 'lib/rq-3.0.0/jobrunner.rb', line 26

def shell
  @shell
end

#stderrObject (readonly)

Returns the value of attribute stderr.



30
31
32
# File 'lib/rq-3.0.0/jobrunner.rb', line 30

def stderr
  @stderr
end

#stdinObject (readonly)

Returns the value of attribute stdin.



28
29
30
# File 'lib/rq-3.0.0/jobrunner.rb', line 28

def stdin
  @stdin
end

#stdoutObject (readonly)

Returns the value of attribute stdout.



29
30
31
# File 'lib/rq-3.0.0/jobrunner.rb', line 29

def stdout
  @stdout
end

Instance Method Details

#runObject

–}}}



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/rq-3.0.0/jobrunner.rb', line 77

def run
#--{{{
  command =
    if @sh_like 
      sin = "0<#{ @stdin }" if @stdin
      sout = "1>#{ @stdout }" if @stdout
      serr = "2>#{ @stderr }" if @stderr
      #"{ #{ @command } ;} #{ sin } #{ sout } #{ serr }"   # this maskes exit_status for bad input!
      "( #{ @command } ;) #{ sin } #{ sout } #{ serr }"
    else
      sin = "<#{ @stdin }" if @stdin
      sout = ">#{ @stdout }" if @stdout
      serr = ">&#{ @stderr }" if @stderr
      "( ( #{ @command } ;) #{ sin } #{ sout } ) #{ serr }"
    end

#STDERR.puts command 

  @w.puts command
  @w.close
#--}}}
end