Module: Postjob::Queue
- Included in:
- Queue
- Defined in:
- lib/postjob/queue.rb,
lib/postjob/queue.rb,
lib/postjob/queue/search.rb,
lib/postjob/queue/encoder.rb,
lib/postjob/queue/settings.rb
Overview
The Postjob::Queue module manages enqueueing and fetching jobs from a job queue.
Defined Under Namespace
Modules: DatabaseInfo, Encoder, Notifications, Search
Constant Summary collapse
- SCHEMA_NAME =
"postjob"- SQL =
::Simple::SQL
- Job =
:nodoc:
::Postjob::Job
- UUID_REGEXP =
/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/i- WorkerSession =
starts a session
::Postjob::WorkerSession
Instance Method Summary collapse
- #checkout(worker_session_id, queues:) ⇒ Object
- #childjobs(parent) ⇒ Object
-
#disable_cron_jobs(workflow, args) ⇒ Object
Disable cron’ness for workflow + args combination.
-
#enqueue_job(worker_session_id, workflow, *args, options) ⇒ Object
enqueues a new job with the given arguments.
- #find_job_by_token(token) ⇒ Object
- #find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil) ⇒ Object
- #find_or_create_token(job) ⇒ Object
-
#host_heartbeat(host_id, measurement) ⇒ Object
sends in a heartbeat.
-
#host_register(attributes, host_id:) ⇒ Object
returns the host id.
- #set_job_error(worker_session_id, job, error, error_message, error_backtrace = nil, status:, version:) ⇒ Object
- #set_job_pending(worker_session_id, job, version:) ⇒ Object
- #set_job_result(worker_session_id, job, value, version:) ⇒ Object
- #settings? ⇒ Boolean
-
#should_shutdown?(worker_session_id) ⇒ Boolean
Asks the database whether this session should be shut down.
- #unresolved_childjobs(parent) ⇒ Object
-
#version ⇒ Object
returns the version of the Postjob queue.
- #worker_session_start(workflows_with_versions, host_id:, queues:) ⇒ Object
-
#worker_session_stop(worker_session) ⇒ Object
stop a worker session.
Methods included from Search
Instance Method Details
#checkout(worker_session_id, queues:) ⇒ Object
176 177 178 179 180 181 182 |
# File 'lib/postjob/queue.rb', line 176 def checkout(worker_session_id, queues:) expect! worker_session_id => UUID_REGEXP expect! queues => [ nil, Array ] SQL.ask "SELECT * FROM #{SCHEMA_NAME}.checkout($1::uuid, $2::boolean, $3)", worker_session_id, Postjob.fast_mode, queues, into: Job end |
#childjobs(parent) ⇒ Object
119 120 121 122 |
# File 'lib/postjob/queue.rb', line 119 def childjobs(parent) expect! parent => Job SQL.all "SELECT * FROM #{SCHEMA_NAME}.childjobs($1)", parent.id, into: Job end |
#disable_cron_jobs(workflow, args) ⇒ Object
Disable cron’ness for workflow + args combination
79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/postjob/queue.rb', line 79 def disable_cron_jobs(workflow, args) sql = <<~SQL UPDATE postjob.postjobs SET cron_interval = NULL WHERE cron_interval IS NOT NULL AND workflow=$1 AND args=$2 AND parent_id IS NULL AND status NOT IN ('ok', 'failed', 'timeout') SQL Simple::SQL.ask sql, workflow, Encoder.encode(args) end |
#enqueue_job(worker_session_id, workflow, *args, options) ⇒ Object
enqueues a new job with the given arguments
Parameters:
-
queue - the name of the queue
-
workflow - the name of the workflow (e.g. “FooBar”, “FooBar#method_name”)
-
version - the version of the workflow, e.g. “0.2”
-
args - an array of arguments, must be encodable via Postjob::JSON.encode
-
parent_id - the id of the parent job, if any
-
tags - # a Hash[String => String]
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/postjob/queue.rb', line 36 def enqueue_job(worker_session_id, workflow, *args, ) expect! workflow => String expect! => { queue: [String, nil], version: [/\A\d(\.\d)+\z/, nil], parent_id: [Integer, nil], tags: [Hash, nil], timeout: [Numeric, nil], max_attempts: [Integer, nil], sticky: [true, false, nil], greedy: [true, false, nil] } workflow, workflow_method = parse_workflow(workflow) if [:greedy] && ![:sticky] raise ArgumentError, "#{workflow}: A greedy job must also be sticky" unless [:sticky].nil? [:sticky] = true if [:greedy] end # The use of a `SELECT * FROM function()` here is due to # # a) a limitation in Simple::SQL which would not be able to unpack a # "SELECT function()" usefully when the return value is a record; # b) and/or my inability to write better SQL functions; SQL.ask "SELECT * FROM #{SCHEMA_NAME}.enqueue($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)", worker_session_id, [:queue], workflow, workflow_method, [:version], Encoder.encode(args), [:parent_id], Encoder.encode([:tags]), [:max_attempts], [:timeout], [:cron_interval], [:sticky], [:greedy], into: Job end |
#find_job_by_token(token) ⇒ Object
188 189 190 |
# File 'lib/postjob/queue.rb', line 188 def find_job_by_token(token) SQL.ask "SELECT * FROM #{SCHEMA_NAME}.postjobs_by_token($1)", token, into: Job end |
#find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/postjob/queue.rb', line 129 def find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil) expect! parent => Job expect! workflow => String expect! args => Array if workflow == "__manual__" && max_attempts != 1 Postjob.logger.info "Job ##{parent.id} adjusting max_attempts of '__manual__' child job" max_attempts = 1 end workflow, workflow_method = parse_workflow(workflow) # The use of a `SELECT * FROM function()` here is due to # # a) a limitation in Simple::SQL which would not be able to unpack a # "SELECT function()" usefully when the return value is a record; # b) and/or my inability to write better SQL functions; SQL.ask "SELECT * FROM #{SCHEMA_NAME}.find_or_create_childjob($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10)", worker_session_id, queue, workflow, workflow_method, nil, # version Encoder.encode(args), parent.id, nil, # tags will be read from parent max_attempts, timeout, into: Job end |
#find_or_create_token(job) ⇒ Object
184 185 186 |
# File 'lib/postjob/queue.rb', line 184 def find_or_create_token(job) SQL.ask "SELECT #{SCHEMA_NAME}.find_or_create_token($1)", job.id end |
#host_heartbeat(host_id, measurement) ⇒ Object
sends in a heartbeat
203 204 205 206 |
# File 'lib/postjob/queue.rb', line 203 def host_heartbeat(host_id, measurement) Simple::SQL.ask "SELECT postjob.host_heartbeat($1::uuid, $2::jsonb, $3)", host_id, JSON.generate(measurement), ::Postjob.fast_mode end |
#host_register(attributes, host_id:) ⇒ Object
returns the host id
195 196 197 198 199 200 |
# File 'lib/postjob/queue.rb', line 195 def host_register(attributes, host_id:) expect! attributes => [ nil, Hash ] expect! host_id => [ nil, UUID_REGEXP ] Simple::SQL.ask "SELECT postjob.host_register($1, $2::uuid)", JSON.generate(attributes), host_id end |
#set_job_error(worker_session_id, job, error, error_message, error_backtrace = nil, status:, version:) ⇒ Object
111 112 113 114 115 116 117 |
# File 'lib/postjob/queue.rb', line 111 def set_job_error(worker_session_id, job, error, , error_backtrace = nil, status:, version:) expect! status => [ :failed, :err, :timeout ] expect! worker_session_id => UUID_REGEXP SQL.ask "SELECT #{SCHEMA_NAME}.set_job_error($1::uuid, $2, $3, $4, $5, $6, $7, $8)", worker_session_id, job.id, error, , Encoder.encode(error_backtrace), status, version, Postjob.fast_mode end |
#set_job_pending(worker_session_id, job, version:) ⇒ Object
105 106 107 108 109 |
# File 'lib/postjob/queue.rb', line 105 def set_job_pending(worker_session_id, job, version:) expect! worker_session_id => UUID_REGEXP SQL.ask "SELECT #{SCHEMA_NAME}.set_job_pending($1::uuid, $2, $3)", worker_session_id, job.id, version end |
#set_job_result(worker_session_id, job, value, version:) ⇒ Object
98 99 100 101 102 103 |
# File 'lib/postjob/queue.rb', line 98 def set_job_result(worker_session_id, job, value, version:) expect! worker_session_id => UUID_REGEXP value = Encoder.encode([value]) unless value.nil? SQL.ask "SELECT #{SCHEMA_NAME}.set_job_result($1::uuid, $2, $3, $4)", worker_session_id, job.id, value, version end |
#settings? ⇒ Boolean
2 3 4 5 |
# File 'lib/postjob/queue/settings.rb', line 2 def settings? tables = SQL::Reflection.tables(schema: "postjob") tables.include?("postjob.settings") end |
#should_shutdown?(worker_session_id) ⇒ Boolean
Asks the database whether this session should be shut down.
94 95 96 |
# File 'lib/postjob/queue.rb', line 94 def should_shutdown?(worker_session_id) SQL.ask "SELECT #{SCHEMA_NAME}.session_should_shutdown($1::uuid)", worker_session_id end |
#unresolved_childjobs(parent) ⇒ Object
124 125 126 127 |
# File 'lib/postjob/queue.rb', line 124 def unresolved_childjobs(parent) expect! parent => Job SQL.ask "SELECT COUNT(*) FROM #{SCHEMA_NAME}.unresolved_childjobs($1)", parent.id end |
#version ⇒ Object
returns the version of the Postjob queue.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/postjob/queue/settings.rb', line 8 def version return "0.3.*" unless settings? sql = <<~SQL SELECT 1 from pg_proc left join pg_namespace on pg_proc.pronamespace=pg_namespace.oid where pg_namespace.nspname='postjob' AND pg_proc.proname='settings_get' SQL if Simple::SQL.ask(sql) version = Simple::SQL.ask "SELECT postjob.settings_get('schema_version')" version ||= Simple::SQL.ask "SELECT postjob.settings_get('version')" else version = Simple::SQL.ask("SELECT value FROM postjob.settings WHERE name=$1", "version") end version || "unknown" end |
#worker_session_start(workflows_with_versions, host_id:, queues:) ⇒ Object
211 212 213 214 215 216 217 |
# File 'lib/postjob/queue.rb', line 211 def worker_session_start(workflows_with_versions, host_id:, queues:) expect! host_id => UUID_REGEXP expect! queues => Array expect! queues.first => String Simple::SQL.ask "SELECT * FROM postjob.worker_session_start($1::uuid, $2, $3)", host_id, workflows_with_versions, queues, into: ::Postjob::WorkerSession end |
#worker_session_stop(worker_session) ⇒ Object
stop a worker session
220 221 222 223 224 |
# File 'lib/postjob/queue.rb', line 220 def worker_session_stop(worker_session) expect! worker_session => UUID_REGEXP Simple::SQL.ask "SELECT * FROM postjob.worker_session_stop($1::uuid)", worker_session end |