Module: Postjob::Runner

Extended by:
Forwardable
Defined in:
lib/postjob/runner.rb

Overview

Base implementations for Runners

This module contains methods for runners.

Constant Summary collapse

Job =
Postjob::Job
STATUSES =
[ :sleep, :ok, :err, :failed ]

Class Method Summary collapse

Class Method Details

.async(workflow, *args, timeout: nil, max_attempts: nil, queue: nil) ⇒ Object

returns a subjob within the current job, for a runner description and args.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/postjob/runner.rb', line 39

def async(workflow, *args, timeout: nil, max_attempts: nil, queue: nil)
  worker_session_id = Postjob.current_session_id

  queue = current_job.queue if queue.nil?

  # if the workflow is a symbol, then we change it into "__manual__"
  # - there should never be a workflow with that name - or into
  # "CurrentWorkshop.#{workflow}", denoting the \a workflow method of the
  # current workflow.
  case workflow
  when :manual  then workflow = "__manual__"
  when Symbol   then workflow = "#{current_job.workflow}.#{workflow}"
  when Module   then workflow = workflow.name
  when String   then :nop
  else
    raise ArgumentError, "Unsupported workflow spec #{workflow.inspect}. Did you run await(fun(a, b)) instead of await(:fun, a, b)"
  end

  ::Postjob::Queue.find_or_create_childjob(worker_session_id, self.current_job, workflow, args,
                                           queue: queue,
                                           timeout: timeout,
                                           max_attempts: max_attempts)
end

.await(job, *args, timeout: nil, max_attempts: nil, queue: nil) ⇒ Object

tries to resolve a job.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/postjob/runner.rb', line 64

def await(job, *args, timeout: nil, max_attempts: nil, queue: nil)
  case job
  when :all
    expect! args == []
    expect! timeout => nil, max_attempts => nil, queue => nil

    unresolved_childjobs = Postjob::Queue.unresolved_childjobs(current_job)
    if unresolved_childjobs > 0
      Postjob.logger.debug "await :all: Found #{unresolved_childjobs} unresolved childjobs"
      throw :pending, :pending
    else
      childjobs = Postjob::Queue.childjobs(current_job)
      childjobs.each do |childjob|
        r = childjob.resolve
        throw :pending, :pending if r == :pending
      end
      childjobs.count
    end
  when Job
    expect! args == []
    expect! timeout => nil, max_attempts => nil, queue => nil

    r = job.resolve
    throw :pending, :pending if r == :pending
    r
  else
    job = async(job, *args, timeout: timeout, max_attempts: max_attempts, queue: queue)
    await(job)
  end
end

.current_jobObject

returns the job that is currently running.

This value is set by process_job (via with_current_job), and currently only used from Postjob::Runner.async



21
22
23
# File 'lib/postjob/runner.rb', line 21

def current_job
  Thread.current[:current_job]
end

.process_job(job) ⇒ Object

runs a specific job

returns a tuple [status, value], which follows the following pattern:

  • [ <runner-version>, :ok, value, nil ]: job completed successfully

  • [ <runner-version>, :sleep, nil, nil ]: job has to wait on a child job

  • [ <runner-version>, :err, <err>, nil ]: job errored with a recoverable error

  • [ <runner-version>, :failed, <err>, <shutdown> ]: job failed with a non-recoverable error

<err> is a tuple [ error-class-name, error-message, stacktrace ]. <shutdown> is either nil or :shutdown



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/postjob/runner.rb', line 110

def process_job(job)
  expect! job => Job

  spec = Postjob::Registry.lookup!(name: job.workflow, version: job.workflow_version)
  expect! spec.runnable?

  with_current_job(job) do
    status, value, shutdown = invoke_workflow spec.workflow, job
    log_result! job, status, value
    # If the status is ok the job finished processing. In that case
    # we'll wait for all child jobs to finish.
    # if status == :ok
    #   await :all
    # end
    [ spec.options.version, status, value, shutdown ]
  end
end