Class: Serfx::Utils::AsyncJob

Inherits:
Object
  • Object
show all
Defined in:
lib/serfx/utils/async_job.rb

Overview

Serf event handler invocations are blocking calls. i.e. serf will not process any other event when a handler invocation is in progress. Due to this, long running tasks should not be invoked as serf handler directly.

AsyncJob helps building serf handlers that involve long running commands. It starts the command in background, allowing handler code to return immediately. It does double fork where the first child process is detached (attached to init as parent process) and and the target long running task is spawned as a second child process. This allows the first child process to wait and reap the output of actual long running task.

The first child process updates a state file before spawning the long ranning task(state=‘invoking’), during the long running task execution (state=‘running’) and after the spawned process’ return (state=‘finished’). This state file provides a convenient way to query the current state of an AsyncJob.

AsyncJob provides four methods to manage jobs. AsyncJob#start will start the task. Once started, AyncJob#state_info can be used to check whether the job is still running or finished. One started a job can be either in ‘running’ state or in ‘finished’ state. AsyncJob#reap is used for deleting the state file once the task is finished. An AsyncJob can be killed, if its in running state, using the AsyncJob#kill method. A new AyncJob can not be started unless previous AsyncJob with same name/state file is reaped.

If the state file is nil, no state will be persisted for the job. As such, AsyncJob#state_info, AsyncJob#kill, and AsyncJob#reap will be a NO-OP.

Following is an example of writing a serf handler using AsyncJob.

Which can be managed via serf as:

serf query bash_test start serf query bash_test check # check if job is running or finished serf query bash_test reap # delete a finished job’s state file serf query bash_test kill

Examples:

require 'serfx/utils/async_job'
require 'serfx/utils/handler'

include Serfx::Utils::Handler

job = Serfx::Utils::AsyncJob.new(
  name: "bash_test"
  command: "bash -c 'for i in `seq 1 300`; do echo $i; sleep 1; done'",
  state: '/opt/serf/states/long_task'
  )

on :query, 'bash_test' do |event|
  case event.payload
  when 'start'
    puts job.start
  when 'kill'
    puts job.kill
  when 'reap'
    puts job.reap
  when 'check'
    puts job.state_info
  else
    puts 'failed'
  end
end

run

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ AsyncJob

Returns a new instance of AsyncJob.

Parameters:

  • opts (Hash) (defaults to: {})

    specify the job details

Options Hash (opts):

  • :state (Symbol)

    file path which will be used to store task state locally

  • :command (Symbol)

    actual command which will be invoked in the background

  • :stdout (Symbol)

    standard output file for the task

  • :stderr (Symbol)

    standard error file for the task

  • :environment (Symbol)

    a hash containing environment variables

  • :cwd (Symbol)

    a string (directory path) containing current directory of the command



92
93
94
95
96
97
98
99
# File 'lib/serfx/utils/async_job.rb', line 92

def initialize(opts = {})
  @state_file = opts[:state]
  @command = opts[:command]
  @stdout_file = opts[:stdout] || File::NULL
  @stderr_file = opts[:stderr] || File::NULL
  @environment = opts[:environment] || {}
  @cwd = opts[:cwd] || Dir.pwd
end

Instance Attribute Details

#commandObject (readonly)

Returns the value of attribute command.



76
77
78
# File 'lib/serfx/utils/async_job.rb', line 76

def command
  @command
end

#cwdObject (readonly)

Returns the value of attribute cwd.



81
82
83
# File 'lib/serfx/utils/async_job.rb', line 81

def cwd
  @cwd
end

#environmentObject (readonly)

Returns the value of attribute environment.



80
81
82
# File 'lib/serfx/utils/async_job.rb', line 80

def environment
  @environment
end

#state_fileObject (readonly)

Returns the value of attribute state_file.



77
78
79
# File 'lib/serfx/utils/async_job.rb', line 77

def state_file
  @state_file
end

#stderr_fileObject (readonly)

Returns the value of attribute stderr_file.



79
80
81
# File 'lib/serfx/utils/async_job.rb', line 79

def stderr_file
  @stderr_file
end

#stdout_fileObject (readonly)

Returns the value of attribute stdout_file.



78
79
80
# File 'lib/serfx/utils/async_job.rb', line 78

def stdout_file
  @stdout_file
end

Instance Method Details

#kill(sig = 'KILL') ⇒ TrueClass, FalseClass

kill an already running task

Parameters:

  • sig (String) (defaults to: 'KILL')

    kill signal that will sent to the background process

Returns:

  • (TrueClass, FalseClass)

    true on success, false on failure



105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/serfx/utils/async_job.rb', line 105

def kill(sig = 'KILL')
  if running?
    begin
      Process.kill(sig, stateinfo['pid'].to_i)
      File.unlink(state_file) if File.exist?(state_file)
      'success'
    rescue Exception
      'failed'
    end
  else
    'failed'
  end
end

#reapString

delete the state file of a finished task

Returns:

  • (String)

    ‘success’ if the task is reaped, ‘failed’ otherwise



140
141
142
143
144
145
146
147
# File 'lib/serfx/utils/async_job.rb', line 140

def reap
  if stateinfo['status'] == 'finished'
    File.unlink(state_file)
    'success'
  else
    'failed'
  end
end

#startString

start a background daemon and spawn another process to run specified command. writes back state information in the state file after spawning daemon process (state=invoking), after spawning the child process (state=running) and after reaping the child process (sate=finished).

Returns:

  • (String)

    ‘success’ if task is started



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/serfx/utils/async_job.rb', line 156

def start
  if exists? || command.nil?
    return 'failed'
  end
  pid = fork do
    Process.daemon
    state = {
      ppid: Process.pid,
      status: 'invoking',
      pid: -1,
      time: Time.now.to_i
    }
    write_state(state)
    begin
      child_pid = Process.spawn(
        environment,
        command,
        out: stdout_file,
        err: stderr_file,
        chdir: cwd
      )
      state[:pid] = child_pid
      state[:status] = 'running'
      write_state(state)
      _, status = Process.wait2(child_pid)
      state[:exitstatus] = status.exitstatus
      state[:status] = 'finished'
    rescue Errno::ENOENT => e
      state[:error] = e.class.name
      state[:status] = 'failed'
    end
    write_state(state)
    exit 0
  end
  Process.detach(pid)
  'success'
end

#state_infoString

obtain current state information about the task as JSON

Returns:

  • (String)

    JSON string representing current state of the task



122
123
124
125
126
127
128
# File 'lib/serfx/utils/async_job.rb', line 122

def state_info
  if exists?
    File.read(state_file)
  else
    JSON.generate(status: 'absent')
  end
end

#stateinfoHash

obtain current state information about the task as hash

Returns:

  • (Hash)

    JSON string representing current state of the task



133
134
135
# File 'lib/serfx/utils/async_job.rb', line 133

def stateinfo
  JSON.parse(state_info)
end