Module: Postjob::Runner
- Extended by:
- Forwardable
- Defined in:
- lib/postjob/runner.rb
Overview
Base implementations for Runners
This module contains methods for runners.
Constant Summary collapse
- Job =
Postjob::Job
- STATUSES =
[ :sleep, :ok, :err, :failed ]
Class Method Summary collapse
-
.async(workflow, *args, timeout: nil, max_attempts: nil, queue: nil) ⇒ Object
returns a subjob within the current job, for a
runnerdescription andargs. -
.await(job, *args, timeout: nil, max_attempts: nil, queue: nil) ⇒ Object
tries to resolve a job.
-
.current_job ⇒ Object
returns the job that is currently running.
-
.process_job(job) ⇒ Object
runs a specific job.
Class Method Details
.async(workflow, *args, timeout: nil, max_attempts: nil, queue: nil) ⇒ Object
returns a subjob within the current job, for a runner description and args.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/postjob/runner.rb', line 39 def async(workflow, *args, timeout: nil, max_attempts: nil, queue: nil) worker_session_id = Postjob.current_session_id queue = current_job.queue if queue.nil? # if the workflow is a symbol, then we change it into "__manual__" # - there should never be a workflow with that name - or into # "CurrentWorkshop.#{workflow}", denoting the \a workflow method of the # current workflow. case workflow when :manual then workflow = "__manual__" when Symbol then workflow = "#{current_job.workflow}.#{workflow}" when Module then workflow = workflow.name when String then :nop else raise ArgumentError, "Unsupported workflow spec #{workflow.inspect}. Did you run await(fun(a, b)) instead of await(:fun, a, b)" end ::Postjob::Queue.find_or_create_childjob(worker_session_id, self.current_job, workflow, args, queue: queue, timeout: timeout, max_attempts: max_attempts) end |
.await(job, *args, timeout: nil, max_attempts: nil, queue: nil) ⇒ Object
tries to resolve a job.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/postjob/runner.rb', line 64 def await(job, *args, timeout: nil, max_attempts: nil, queue: nil) case job when :all expect! args == [] expect! timeout => nil, max_attempts => nil, queue => nil unresolved_childjobs = Postjob::Queue.unresolved_childjobs(current_job) if unresolved_childjobs > 0 Postjob.logger.debug "await :all: Found #{unresolved_childjobs} unresolved childjobs" throw :pending, :pending else childjobs = Postjob::Queue.childjobs(current_job) childjobs.each do |childjob| r = childjob.resolve throw :pending, :pending if r == :pending end childjobs.count end when Job expect! args == [] expect! timeout => nil, max_attempts => nil, queue => nil r = job.resolve throw :pending, :pending if r == :pending r else job = async(job, *args, timeout: timeout, max_attempts: max_attempts, queue: queue) await(job) end end |
.current_job ⇒ Object
returns the job that is currently running.
This value is set by process_job (via with_current_job), and currently only used from Postjob::Runner.async
21 22 23 |
# File 'lib/postjob/runner.rb', line 21 def current_job Thread.current[:current_job] end |
.process_job(job) ⇒ Object
runs a specific job
returns a tuple [status, value], which follows the following pattern:
-
[ <runner-version>, :ok, value, nil ]: job completed successfully -
[ <runner-version>, :sleep, nil, nil ]: job has to wait on a child job -
[ <runner-version>, :err, <err>, nil ]: job errored with a recoverable error -
[ <runner-version>, :failed, <err>, <shutdown> ]: job failed with a non-recoverable error
<err> is a tuple [ error-class-name, error-message, stacktrace ]. <shutdown> is either nil or :shutdown
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/postjob/runner.rb', line 110 def process_job(job) expect! job => Job spec = Postjob::Registry.lookup!(name: job.workflow, version: job.workflow_version) expect! spec.runnable? with_current_job(job) do status, value, shutdown = invoke_workflow spec.workflow, job log_result! job, status, value # If the status is ok the job finished processing. In that case # we'll wait for all child jobs to finish. # if status == :ok # await :all # end [ spec..version, status, value, shutdown ] end end |