Module: Postjob

Extended by:
Postjob
Included in:
Postjob
Defined in:
lib/postjob/migrations.rb,
lib/postjob.rb,
lib/postjob.rb,
lib/postjob/error.rb,
lib/postjob/version.rb

Overview

rubocop:disable Security/Eval

Defined Under Namespace

Modules: CLI, GemHelper, Migrations, Queue, Runner, Workflow Classes: Error, Host, Job, Record, Registry, WorkerSession

Constant Summary collapse

DEFAULT_QUEUE =
"default"
CRON_INTERVAL_MIN =
60
VERSION =
GemHelper.version "postjob"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#fast_modeObject

In fast mode Postjob.run doesn’t wait that long between retrying failed jobs. This mode is enabled by default during tests; and it can be enabled via “postjob run –fast”

Note that fast mode should only be used during development and tests.



44
45
46
# File 'lib/postjob.rb', line 44

def fast_mode
  @fast_mode
end

#loggerObject

Returns the value of attribute logger.



28
29
30
# File 'lib/postjob.rb', line 28

def logger
  @logger
end

Instance Method Details

#current_session?Boolean

Returns:

  • (Boolean)


263
264
265
# File 'lib/postjob.rb', line 263

def current_session?
  @worker_session != @nil
end

#current_session_idObject

This method connects to the queue. This means it registers as a new worker_session, if there was no worker_session yet.



269
270
271
272
273
# File 'lib/postjob.rb', line 269

def current_session_id
  raise("worker_session hasn't been started yet.") unless @worker_session

  @worker_session.id
end

#enqueue!(workflow, *args, queue: nil, parent_id: nil, max_attempts: nil, timeout: nil, version: nil, tags: nil, cron_interval: nil, sticky: nil) ⇒ Object

Enqueues a workflow.

Options include

  • version

  • max_attempts

  • timeout

  • sticky

  • cron_interval

  • queue

Returns a job id



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/postjob.rb', line 62

def enqueue!(workflow, *args, queue: nil,
             parent_id: nil,
             max_attempts: nil,
             timeout: nil,
             version: nil,
             tags: nil,
             cron_interval: nil,
             sticky: nil)
  expect! queue => [nil, String]
  expect! workflow => String
  expect! parent_id => [nil, Integer]
  expect! max_attempts => [nil, Integer]
  expect! timeout => [nil, Numeric]
  expect! tags => [nil, Hash]
  expect! sticky => [nil, false, true]

  # -- prepare arguments ----------------------------------------------------

  if workflow == "__manual__" && max_attempts != 1
    if parent_id
      Postjob.logger.info "Job ##{parent_id} adjusting max_attempts of '__manual__' child job"
    else
      Postjob.logger.info "Adjusting max_attempts of '__manual__' root job"
    end

    max_attempts = 1
  end

  # -- fetch defaults from registry -----------------------------------------

  spec = Postjob::Registry[workflow]
  if spec
    max_attempts    = spec.options.max_attempts if max_attempts.nil?
    timeout         = spec.options.timeout if timeout.nil?
    sticky          = spec.options.sticky if sticky.nil?
    cron_interval   = spec.options.cron_interval if cron_interval.nil?
    queue           = spec.options.queue if queue.nil?
    greedy          = spec.options.greedy
  end

  if cron_interval && cron_interval < CRON_INTERVAL_MIN
    raise "cron interval must be at least #{CRON_INTERVAL_MIN} seconds"
  end

  # -- disable existing cron jobs -------------------------------------------

  if cron_interval
    Queue.disable_cron_jobs(workflow, args)
  end

  # -- enqueue jobs ---------------------------------------------------------

  tags = stringify_hash(tags) if tags

  session_id = current_session_id if current_session?
  session_id ||= "00000000-0000-0000-0000-000000000000"

  job = Queue.enqueue_job session_id, workflow, *args, queue: queue,
                                                       parent_id: parent_id,
                                                       max_attempts: max_attempts,
                                                       timeout: timeout,
                                                       tags: tags,
                                                       version: version,
                                                       cron_interval: cron_interval,
                                                       sticky: sticky,
                                                       greedy: greedy

  logger.info "Generated process #{job}"
  job.id
end

#envObject



31
32
33
# File 'lib/postjob.rb', line 31

def env
  ENV["POSTJOB_ENV"] || ENV["RAILS_ENV"] || ENV["RACK_ENV"] || "development"
end

#host_idObject



35
36
37
# File 'lib/postjob.rb', line 35

def host_id
  Host.host_id
end

#process_allObject

process all waiting jobs.

This method starts processing jobs, as long as there are some. It returns once no runnable jobs can be found anymore.

Note that this method is not limited to the set of runnable jobs present when calling it; if running a job results in newly created runnable jobs these jobs will be processed as well.

This method returns the number of processed jobs.



154
155
156
157
158
# File 'lib/postjob.rb', line 154

def process_all
  run do |job|
    !job.nil?
  end
end

#register_workflow(workflow, options = {}) ⇒ Object

Registers a workflow.

This call registers a workflow with a set of options.

This is usually used via

module X
  Postjob.register self, max_attempts: 2, timeout: 86400
end

The workflow parameter is either a module which implements a workflow, or a String with the name of a workflow, and a set of options.

Options include

  • version

  • max_attempts

  • timeout

  • sticky

  • cron_interval

  • queue



307
308
309
310
311
312
# File 'lib/postjob.rb', line 307

def register_workflow(workflow, options = {})
  expect! workflow => [ Module, String ]

  workflow.extend Postjob::Workflow if workflow.is_a?(Module)
  Registry.register workflow, options
end

#resolve(token:, result:) ⇒ Object

Explicitely resolve a workflow.



276
277
278
279
280
281
# File 'lib/postjob.rb', line 276

def resolve(token:, result:)
  job = Queue.find_job_by_token(token)
  raise "No job with token #{token}" unless job

  Queue.set_job_result current_session_id, job, result, version: nil
end

#run(count: nil, queues: nil, heartbeat: true, &block) ⇒ Object

processes many jobs.

This method starts processing jobs, as long as there are some. If no jobs can be found this method waits until a job becomes available.

After processing each job is yielded into the passed in block.

This method continues until: a) the requested number of jobs (via the count: argument) was processed (note:

repeated job executions due to rerunning jobs that slept or errored count
multiple times), or

b) the block yielded into returns false.

This method returns the number of processed jobs.



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/postjob.rb', line 174

def run(count: nil, queues: nil, heartbeat: true, &block)
  queues ||= Postjob::Registry.queues

  # to run 10^12 jobs that would take 1 msecs each we would need, at least,
  # 760 years - so this default should be fine. Also, someone should update
  # the machine in the meantime :)
  count ||= 1_000_000_000_000

  with_worker_session heartbeat: heartbeat, queues: queues do
    processed_jobs_count = 0

    loop do
      processed_job_id, shutdown = Postjob.step(queues: queues)
      processed_jobs_count += 1 if processed_job_id

      break if processed_jobs_count >= count
      break if block && yield(processed_job_id) == false
      break if shutdown == :shutdown

      next if processed_job_id
      shutdown = Queue::Notifications.wait_for_new_job(current_session_id, queues: queues)
      break if shutdown == :shutdown
    end

    processed_jobs_count
  end
end

#start_worker_session!(heartbeat: true, queues: nil) ⇒ Object

:nodoc:



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/postjob.rb', line 215

def start_worker_session!(heartbeat: true, queues: nil) # :nodoc:
  queues ||= Postjob::Registry.queues

  # We can't restart a new worker_session (this is currently not supported)
  # and probably also unnecessary. Instead we ignore this call, as long as
  # the queue setting is identical.
  #
  # A call to start_worker_session! is valid only during tests.
  if @worker_session
    raise ArgumentError, "You cannot restart a worker session" if ENV["POSTJOB_ENV"] != "test"
    raise ArgumentError, "You cannot restart a worker_session with different queues" if @worker_session.queues != queues

    return @worker_session
  end

  @worker_session = WorkerSession.start!(Registry.runnable_workflows_with_versions, heartbeat: heartbeat, queues: queues)
  # STDERR.puts "set worker_session to #{@worker_session.inspect}"
  @worker_session
end

#step(queues: nil) ⇒ Object

Runs a single job

This method tries to check out a runnable job. If it finds one the job is processed (via Postjob.process_job).

This method returns a tuple [ <job>, <shutdown> ], where

  • <job-id> is the id of the job which has been processed;

  • <shutdown> is a flag, either :shutdown or nil. :shutdown notifies self.run to terminate the run loop.

or nil, when no job could be checked out.



254
255
256
257
258
259
260
261
# File 'lib/postjob.rb', line 254

def step(queues: nil)
  expect! queues => [Array, nil]
  queues ||= Postjob::Registry.queues

  job = Postjob::Queue.checkout(current_session_id, queues: queues)

  [ job.id, process_job(job) ] if job
end

#stop_worker_session!Object



235
236
237
238
239
240
# File 'lib/postjob.rb', line 235

def stop_worker_session!
  return unless @worker_session

  WorkerSession.stop!(@worker_session)
  @worker_session = nil
end

#with_worker_session(heartbeat:, queues:) ⇒ Object

run needs a worker_session. If there is none, we start one.



203
204
205
206
207
208
209
210
211
212
213
# File 'lib/postjob.rb', line 203

def with_worker_session(heartbeat:, queues:) # :nodoc:
  new_session = false
  unless current_session?
    start_worker_session!(heartbeat: heartbeat, queues: queues)
    new_session = true
  end

  yield
ensure
  stop_worker_session! if new_session
end