Module: Postjob
- Extended by:
- Postjob
- Included in:
- Postjob
- Defined in:
- lib/postjob/migrations.rb,
lib/postjob.rb,
lib/postjob.rb,
lib/postjob/error.rb,
lib/postjob/version.rb
Overview
rubocop:disable Security/Eval
Defined Under Namespace
Modules: CLI, GemHelper, Migrations, Queue, Runner, Workflow Classes: Error, Host, Job, Record, Registry, WorkerSession
Constant Summary collapse
Instance Attribute Summary collapse
-
#fast_mode ⇒ Object
In fast mode
Postjob.run
doesn’t wait that long between retrying failed jobs. -
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
- #current_session? ⇒ Boolean
-
#current_session_id ⇒ Object
This method connects to the queue.
-
#enqueue!(workflow, *args, queue: nil, parent_id: nil, max_attempts: nil, timeout: nil, version: nil, tags: nil, cron_interval: nil, sticky: nil) ⇒ Object
Enqueues a workflow.
- #env ⇒ Object
- #host_id ⇒ Object
-
#process_all ⇒ Object
process all waiting jobs.
-
#register_workflow(workflow, options = {}) ⇒ Object
Registers a workflow.
-
#resolve(token:, result:) ⇒ Object
Explicitely resolve a workflow.
-
#run(count: nil, queues: nil, heartbeat: true, &block) ⇒ Object
processes many jobs.
-
#start_worker_session!(heartbeat: true, queues: nil) ⇒ Object
:nodoc:.
-
#step(queues: nil) ⇒ Object
Runs a single job.
- #stop_worker_session! ⇒ Object
-
#with_worker_session(heartbeat:, queues:) ⇒ Object
run needs a worker_session.
Instance Attribute Details
#fast_mode ⇒ Object
In fast mode Postjob.run
doesn’t wait that long between retrying failed jobs. This mode is enabled by default during tests; and it can be enabled via “postjob run –fast”
Note that fast mode should only be used during development and tests.
44 45 46 |
# File 'lib/postjob.rb', line 44 def fast_mode @fast_mode end |
#logger ⇒ Object
Returns the value of attribute logger.
28 29 30 |
# File 'lib/postjob.rb', line 28 def logger @logger end |
Instance Method Details
#current_session? ⇒ Boolean
263 264 265 |
# File 'lib/postjob.rb', line 263 def current_session? @worker_session != @nil end |
#current_session_id ⇒ Object
This method connects to the queue. This means it registers as a new worker_session, if there was no worker_session yet.
269 270 271 272 273 |
# File 'lib/postjob.rb', line 269 def current_session_id raise("worker_session hasn't been started yet.") unless @worker_session @worker_session.id end |
#enqueue!(workflow, *args, queue: nil, parent_id: nil, max_attempts: nil, timeout: nil, version: nil, tags: nil, cron_interval: nil, sticky: nil) ⇒ Object
Enqueues a workflow.
Options include
-
version
-
max_attempts
-
timeout
-
sticky
-
cron_interval
-
queue
Returns a job id
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/postjob.rb', line 62 def enqueue!(workflow, *args, queue: nil, parent_id: nil, max_attempts: nil, timeout: nil, version: nil, tags: nil, cron_interval: nil, sticky: nil) expect! queue => [nil, String] expect! workflow => String expect! parent_id => [nil, Integer] expect! max_attempts => [nil, Integer] expect! timeout => [nil, Numeric] expect! => [nil, Hash] expect! sticky => [nil, false, true] # -- prepare arguments ---------------------------------------------------- if workflow == "__manual__" && max_attempts != 1 if parent_id Postjob.logger.info "Job ##{parent_id} adjusting max_attempts of '__manual__' child job" else Postjob.logger.info "Adjusting max_attempts of '__manual__' root job" end max_attempts = 1 end # -- fetch defaults from registry ----------------------------------------- spec = Postjob::Registry[workflow] if spec max_attempts = spec..max_attempts if max_attempts.nil? timeout = spec..timeout if timeout.nil? sticky = spec..sticky if sticky.nil? cron_interval = spec..cron_interval if cron_interval.nil? queue = spec..queue if queue.nil? greedy = spec..greedy end if cron_interval && cron_interval < CRON_INTERVAL_MIN raise "cron interval must be at least #{CRON_INTERVAL_MIN} seconds" end # -- disable existing cron jobs ------------------------------------------- if cron_interval Queue.disable_cron_jobs(workflow, args) end # -- enqueue jobs --------------------------------------------------------- = stringify_hash() if session_id = current_session_id if current_session? session_id ||= "00000000-0000-0000-0000-000000000000" job = Queue.enqueue_job session_id, workflow, *args, queue: queue, parent_id: parent_id, max_attempts: max_attempts, timeout: timeout, tags: , version: version, cron_interval: cron_interval, sticky: sticky, greedy: greedy logger.info "Generated process #{job}" job.id end |
#env ⇒ Object
31 32 33 |
# File 'lib/postjob.rb', line 31 def env ENV["POSTJOB_ENV"] || ENV["RAILS_ENV"] || ENV["RACK_ENV"] || "development" end |
#process_all ⇒ Object
process all waiting jobs.
This method starts processing jobs, as long as there are some. It returns once no runnable jobs can be found anymore.
Note that this method is not limited to the set of runnable jobs present when calling it; if running a job results in newly created runnable jobs these jobs will be processed as well.
This method returns the number of processed jobs.
154 155 156 157 158 |
# File 'lib/postjob.rb', line 154 def process_all run do |job| !job.nil? end end |
#register_workflow(workflow, options = {}) ⇒ Object
Registers a workflow.
This call registers a workflow with a set of options.
This is usually used via
module X
Postjob.register self, max_attempts: 2, timeout: 86400
end
The workflow
parameter is either a module which implements a workflow, or a String with the name of a workflow, and a set of options.
Options include
-
version
-
max_attempts
-
timeout
-
sticky
-
cron_interval
-
queue
307 308 309 310 311 312 |
# File 'lib/postjob.rb', line 307 def register_workflow(workflow, = {}) expect! workflow => [ Module, String ] workflow.extend Postjob::Workflow if workflow.is_a?(Module) Registry.register workflow, end |
#resolve(token:, result:) ⇒ Object
Explicitely resolve a workflow.
276 277 278 279 280 281 |
# File 'lib/postjob.rb', line 276 def resolve(token:, result:) job = Queue.find_job_by_token(token) raise "No job with token #{token}" unless job Queue.set_job_result current_session_id, job, result, version: nil end |
#run(count: nil, queues: nil, heartbeat: true, &block) ⇒ Object
processes many jobs.
This method starts processing jobs, as long as there are some. If no jobs can be found this method waits until a job becomes available.
After processing each job is yielded into the passed in block.
This method continues until: a) the requested number of jobs (via the count: argument) was processed (note:
repeated job executions due to rerunning jobs that slept or errored count
multiple times), or
b) the block yielded into returns false.
This method returns the number of processed jobs.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/postjob.rb', line 174 def run(count: nil, queues: nil, heartbeat: true, &block) queues ||= Postjob::Registry.queues # to run 10^12 jobs that would take 1 msecs each we would need, at least, # 760 years - so this default should be fine. Also, someone should update # the machine in the meantime :) count ||= 1_000_000_000_000 with_worker_session heartbeat: heartbeat, queues: queues do processed_jobs_count = 0 loop do processed_job_id, shutdown = Postjob.step(queues: queues) processed_jobs_count += 1 if processed_job_id break if processed_jobs_count >= count break if block && yield(processed_job_id) == false break if shutdown == :shutdown next if processed_job_id shutdown = Queue::Notifications.wait_for_new_job(current_session_id, queues: queues) break if shutdown == :shutdown end processed_jobs_count end end |
#start_worker_session!(heartbeat: true, queues: nil) ⇒ Object
:nodoc:
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/postjob.rb', line 215 def start_worker_session!(heartbeat: true, queues: nil) # :nodoc: queues ||= Postjob::Registry.queues # We can't restart a new worker_session (this is currently not supported) # and probably also unnecessary. Instead we ignore this call, as long as # the queue setting is identical. # # A call to start_worker_session! is valid only during tests. if @worker_session raise ArgumentError, "You cannot restart a worker session" if ENV["POSTJOB_ENV"] != "test" raise ArgumentError, "You cannot restart a worker_session with different queues" if @worker_session.queues != queues return @worker_session end @worker_session = WorkerSession.start!(Registry.runnable_workflows_with_versions, heartbeat: heartbeat, queues: queues) # STDERR.puts "set worker_session to #{@worker_session.inspect}" @worker_session end |
#step(queues: nil) ⇒ Object
Runs a single job
This method tries to check out a runnable job. If it finds one the job is processed (via Postjob.process_job).
This method returns a tuple [ <job>, <shutdown> ], where
-
<job-id> is the id of the job which has been processed;
-
<shutdown> is a flag, either :shutdown or nil. :shutdown notifies self.run to terminate the run loop.
or nil, when no job could be checked out.
254 255 256 257 258 259 260 261 |
# File 'lib/postjob.rb', line 254 def step(queues: nil) expect! queues => [Array, nil] queues ||= Postjob::Registry.queues job = Postjob::Queue.checkout(current_session_id, queues: queues) [ job.id, process_job(job) ] if job end |
#stop_worker_session! ⇒ Object
235 236 237 238 239 240 |
# File 'lib/postjob.rb', line 235 def stop_worker_session! return unless @worker_session WorkerSession.stop!(@worker_session) @worker_session = nil end |
#with_worker_session(heartbeat:, queues:) ⇒ Object
run needs a worker_session. If there is none, we start one.
203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/postjob.rb', line 203 def with_worker_session(heartbeat:, queues:) # :nodoc: new_session = false unless current_session? start_worker_session!(heartbeat: heartbeat, queues: queues) new_session = true end yield ensure stop_worker_session! if new_session end |