Module: Postjob::Queue

Extended by:
Queue, Search
Included in:
Queue
Defined in:
lib/postjob/queue.rb,
lib/postjob/queue.rb,
lib/postjob/queue/search.rb,
lib/postjob/queue/encoder.rb,
lib/postjob/queue/settings.rb

Overview

The Postjob::Queue module manages enqueueing and fetching jobs from a job queue.

Defined Under Namespace

Modules: DatabaseInfo, Encoder, Notifications, Search

Constant Summary collapse

SCHEMA_NAME =
"postjob"
SQL =
::Simple::SQL
Job =

:nodoc:

::Postjob::Job
UUID_REGEXP =
/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/i
WorkerSession =

starts a session

::Postjob::WorkerSession

Instance Method Summary collapse

Methods included from Search

search

Instance Method Details

#checkout(worker_session_id, queues:) ⇒ Object



176
177
178
179
180
181
182
# File 'lib/postjob/queue.rb', line 176

def checkout(worker_session_id, queues:)
  expect! worker_session_id => UUID_REGEXP
  expect! queues => [ nil, Array ]

  SQL.ask "SELECT * FROM #{SCHEMA_NAME}.checkout($1::uuid, $2::boolean, $3)",
    worker_session_id, Postjob.fast_mode, queues, into: Job
end

#childjobs(parent) ⇒ Object



119
120
121
122
# File 'lib/postjob/queue.rb', line 119

def childjobs(parent)
  expect! parent => Job
  SQL.all "SELECT * FROM #{SCHEMA_NAME}.childjobs($1)", parent.id, into: Job
end

#disable_cron_jobs(workflow, args) ⇒ Object

Disable cron’ness for workflow + args combination



79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/postjob/queue.rb', line 79

def disable_cron_jobs(workflow, args)
  sql = <<~SQL
    UPDATE postjob.postjobs
    SET cron_interval = NULL
    WHERE cron_interval IS NOT NULL
      AND workflow=$1
      AND args=$2
      AND parent_id IS NULL
      AND status NOT IN ('ok', 'failed', 'timeout')
    SQL

  Simple::SQL.ask sql, workflow, Encoder.encode(args)
end

#enqueue_job(worker_session_id, workflow, *args, options) ⇒ Object

enqueues a new job with the given arguments

Parameters:

  • queue - the name of the queue

  • workflow - the name of the workflow (e.g. “FooBar”, “FooBar#method_name”)

  • version - the version of the workflow, e.g. “0.2”

  • args - an array of arguments, must be encodable via Postjob::JSON.encode

  • parent_id - the id of the parent job, if any

  • tags - # a Hash[String => String]



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/postjob/queue.rb', line 36

def enqueue_job(worker_session_id, workflow, *args, options)
  expect! workflow => String
  expect! options => {
    queue: [String, nil],
    version: [/\A\d(\.\d)+\z/, nil],
    parent_id: [Integer, nil],
    tags: [Hash, nil],
    timeout: [Numeric, nil],
    max_attempts: [Integer, nil],
    sticky: [true, false, nil],
    greedy: [true, false, nil]
  }

  workflow, workflow_method = parse_workflow(workflow)

  if options[:greedy] && !options[:sticky]
    raise ArgumentError, "#{workflow}: A greedy job must also be sticky" unless options[:sticky].nil?
    options[:sticky] = true if options[:greedy]
  end

  # The use of a `SELECT * FROM function()` here is due to
  #
  # a) a limitation in Simple::SQL which would not be able to unpack a
  #    "SELECT function()" usefully when the return value is a record;
  # b) and/or my inability to write better SQL functions;
  SQL.ask "SELECT * FROM #{SCHEMA_NAME}.enqueue($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
                worker_session_id,
                options[:queue],
                workflow,
                workflow_method,
                options[:version],
                Encoder.encode(args),
                options[:parent_id],
                Encoder.encode(options[:tags]),
                options[:max_attempts],
                options[:timeout],
                options[:cron_interval],
                options[:sticky],
                options[:greedy],
                into: Job
end

#find_job_by_token(token) ⇒ Object



188
189
190
# File 'lib/postjob/queue.rb', line 188

def find_job_by_token(token)
  SQL.ask "SELECT * FROM #{SCHEMA_NAME}.postjobs_by_token($1)", token, into: Job
end

#find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/postjob/queue.rb', line 129

def find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil)
  expect! parent => Job
  expect! workflow => String
  expect! args => Array

  if workflow == "__manual__" && max_attempts != 1
    Postjob.logger.info "Job ##{parent.id} adjusting max_attempts of '__manual__' child job"
    max_attempts = 1
  end

  workflow, workflow_method = parse_workflow(workflow)

  # The use of a `SELECT * FROM function()` here is due to
  #
  # a) a limitation in Simple::SQL which would not be able to unpack a
  #    "SELECT function()" usefully when the return value is a record;
  # b) and/or my inability to write better SQL functions;
  SQL.ask "SELECT * FROM #{SCHEMA_NAME}.find_or_create_childjob($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
         worker_session_id,
         queue,
         workflow,
         workflow_method,
         nil,                      # version
         Encoder.encode(args),
         parent.id,
         nil,                      # tags will be read from parent
         max_attempts,
         timeout,
         into: Job
end

#find_or_create_token(job) ⇒ Object



184
185
186
# File 'lib/postjob/queue.rb', line 184

def find_or_create_token(job)
  SQL.ask "SELECT #{SCHEMA_NAME}.find_or_create_token($1)", job.id
end

#host_heartbeat(host_id, measurement) ⇒ Object

sends in a heartbeat



203
204
205
206
# File 'lib/postjob/queue.rb', line 203

def host_heartbeat(host_id, measurement)
  Simple::SQL.ask "SELECT postjob.host_heartbeat($1::uuid, $2::jsonb, $3)",
    host_id, JSON.generate(measurement), ::Postjob.fast_mode
end

#host_register(attributes, host_id:) ⇒ Object

returns the host id



195
196
197
198
199
200
# File 'lib/postjob/queue.rb', line 195

def host_register(attributes, host_id:)
  expect! attributes => [ nil, Hash ]
  expect! host_id => [ nil, UUID_REGEXP ]

  Simple::SQL.ask "SELECT postjob.host_register($1, $2::uuid)", JSON.generate(attributes), host_id
end

#set_job_error(worker_session_id, job, error, error_message, error_backtrace = nil, status:, version:) ⇒ Object



111
112
113
114
115
116
117
# File 'lib/postjob/queue.rb', line 111

def set_job_error(worker_session_id, job, error, error_message, error_backtrace = nil, status:, version:)
  expect! status => [ :failed, :err, :timeout ]
  expect! worker_session_id => UUID_REGEXP

  SQL.ask "SELECT #{SCHEMA_NAME}.set_job_error($1::uuid, $2, $3, $4, $5, $6, $7, $8)",
    worker_session_id, job.id, error, error_message, Encoder.encode(error_backtrace), status, version, Postjob.fast_mode
end

#set_job_pending(worker_session_id, job, version:) ⇒ Object



105
106
107
108
109
# File 'lib/postjob/queue.rb', line 105

def set_job_pending(worker_session_id, job, version:)
  expect! worker_session_id => UUID_REGEXP

  SQL.ask "SELECT #{SCHEMA_NAME}.set_job_pending($1::uuid, $2, $3)", worker_session_id, job.id, version
end

#set_job_result(worker_session_id, job, value, version:) ⇒ Object



98
99
100
101
102
103
# File 'lib/postjob/queue.rb', line 98

def set_job_result(worker_session_id, job, value, version:)
  expect! worker_session_id => UUID_REGEXP

  value = Encoder.encode([value]) unless value.nil?
  SQL.ask "SELECT #{SCHEMA_NAME}.set_job_result($1::uuid, $2, $3, $4)", worker_session_id, job.id, value, version
end

#settings?Boolean

Returns:

  • (Boolean)


2
3
4
5
# File 'lib/postjob/queue/settings.rb', line 2

def settings?
  tables = SQL::Reflection.tables(schema: "postjob")
  tables.include?("postjob.settings")
end

#should_shutdown?(worker_session_id) ⇒ Boolean

Asks the database whether this session should be shut down.

Returns:

  • (Boolean)


94
95
96
# File 'lib/postjob/queue.rb', line 94

def should_shutdown?(worker_session_id)
  SQL.ask "SELECT #{SCHEMA_NAME}.session_should_shutdown($1::uuid)", worker_session_id
end

#unresolved_childjobs(parent) ⇒ Object



124
125
126
127
# File 'lib/postjob/queue.rb', line 124

def unresolved_childjobs(parent)
  expect! parent => Job
  SQL.ask "SELECT COUNT(*) FROM #{SCHEMA_NAME}.unresolved_childjobs($1)", parent.id
end

#versionObject

returns the version of the Postjob queue.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/postjob/queue/settings.rb', line 8

def version
  return "0.3.*" unless settings?

  sql = <<~SQL
    SELECT 1 from pg_proc
    left join pg_namespace on pg_proc.pronamespace=pg_namespace.oid
    where pg_namespace.nspname='postjob' AND pg_proc.proname='settings_get'
  SQL

  if Simple::SQL.ask(sql)
    version = Simple::SQL.ask "SELECT postjob.settings_get('schema_version')"
    version ||= Simple::SQL.ask "SELECT postjob.settings_get('version')"
  else
    version = Simple::SQL.ask("SELECT value FROM postjob.settings WHERE name=$1", "version")
  end

  version || "unknown"
end

#worker_session_start(workflows_with_versions, host_id:, queues:) ⇒ Object



211
212
213
214
215
216
217
# File 'lib/postjob/queue.rb', line 211

def worker_session_start(workflows_with_versions, host_id:, queues:)
  expect! host_id => UUID_REGEXP
  expect! queues => Array
  expect! queues.first => String

  Simple::SQL.ask "SELECT * FROM postjob.worker_session_start($1::uuid, $2, $3)", host_id, workflows_with_versions, queues, into: ::Postjob::WorkerSession
end

#worker_session_stop(worker_session) ⇒ Object

stop a worker session



220
221
222
223
224
# File 'lib/postjob/queue.rb', line 220

def worker_session_stop(worker_session)
  expect! worker_session => UUID_REGEXP

  Simple::SQL.ask "SELECT * FROM postjob.worker_session_stop($1::uuid)", worker_session
end