Class: Skiplock::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/skiplock/job.rb

Class Method Summary collapse

Class Method Details

.dispatch(connection: ActiveRecord::Base.connection) ⇒ Object

Accept: An active ActiveRecord database connection (eg. ActiveRecord::Base.connection)

The connection should be checked out using ActiveRecord::Base.connection_pool.checkout, and be checked
in using ActiveRecord::Base.conection_pool.checkin once all of the job dispatches have been completed.
*** IMPORTANT: This connection cannot be shared with the job's execution

Return: Attributes hash of the Job if it was executed; otherwise returns the next Job’s schedule time in FLOAT



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/skiplock/job.rb', line 11

def self.dispatch(connection: ActiveRecord::Base.connection)
  connection.exec_query('BEGIN')
  job = connection.exec_query("SELECT * FROM #{self.table_name} WHERE running = 'f' AND expired_at IS NULL AND finished_at IS NULL ORDER BY scheduled_at ASC NULLS FIRST, priority ASC NULLS LAST, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").first
  if job && job['scheduled_at'].to_f <= Time.now.to_f  # found job ready to perform
    # update the job to mark it in progress in case database server goes down during job execution
    connection.exec_query("UPDATE #{self.table_name} SET running = 't' WHERE id = '#{job['id']}'")
    connection.exec_query('END') # close the transaction commit the state of job in progress
    executions = (job['executions'] || 0) + 1
    exceptions = job['exception_executions'] ? JSON.parse(job['exception_executions']) : {}
    data = job['data'] ? JSON.parse(job['data']) : {}
    job_data = job.slice('job_class', 'queue_name', 'locale', 'timezone', 'priority', 'executions').merge('job_id' => job['id'], 'exception_executions' => exceptions, 'enqueued_at' => job['updated_at']).merge(data)
    Thread.current[:skiplock_dispatch_data] = job_data
    begin
      ActiveJob::Base.execute(job_data)
    rescue Exception => ex
    end
    if ex
      # TODO: report exception
      exceptions["[#{ex.class.name}]"] = (exceptions["[#{ex.class.name}]"] || 0) + 1 unless exceptions.key?('activejob_retry')
      if executions >= Settings['max_retries'] || exceptions.key?('activejob_retry')
        connection.exec_query("UPDATE #{self.table_name} SET running = 'f', executions = #{executions}, exception_executions = '#{connection.quote_string(exceptions.to_json.to_s)}', expired_at = NOW(), updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first
      else
        timestamp = Time.now + (5 * 2**executions)
        connection.exec_query("UPDATE #{self.table_name} SET running = 'f', executions = #{executions}, exception_executions = '#{connection.quote_string(exceptions.to_json.to_s)}', scheduled_at = TO_TIMESTAMP(#{timestamp.to_f}), updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first
      end
    elsif exceptions.key?('activejob_retry')
      connection.exec_query("UPDATE #{self.table_name} SET running = 'f', executions = #{job_data['executions']}, exception_executions = '#{connection.quote_string(job_data['exception_executions'].to_json.to_s)}', scheduled_at = TO_TIMESTAMP(#{job_data['scheduled_at'].to_f}), updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first
    elsif job['cron']
      data['last_cron_run'] = Time.now.utc.to_s
      next_cron_at = Cron.next_schedule_at(job['cron'])
      if next_cron_at
        connection.exec_query("UPDATE #{self.table_name} SET running = 'f', scheduled_at = TO_TIMESTAMP(#{next_cron_at}), executions = 1, exception_executions = NULL, data = '#{connection.quote_string(data.to_json.to_s)}', updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first
      else
        connection.exec_query("DELETE FROM #{self.table_name} WHERE id = '#{job['id']}' RETURNING *").first
      end
    elsif Settings['purge_completion']
      connection.exec_query("DELETE FROM #{self.table_name} WHERE id = '#{job['id']}' RETURNING *").first
    else
      connection.exec_query("UPDATE #{self.table_name} SET running = 'f', executions = #{executions}, exception_executions = NULL, finished_at = NOW(), updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first
    end
  else
    connection.exec_query('END')
    job ? job['scheduled_at'].to_f : Float::INFINITY
  end
ensure
  Thread.current[:skiplock_dispatch_data] = nil
end

.enqueue_at(job, timestamp) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/skiplock/job.rb', line 59

def self.enqueue_at(job, timestamp)
  if Thread.current[:skiplock_dispatch_data]
    job.exception_executions['activejob_retry'] = true
    Thread.current[:skiplock_dispatch_data]['executions'] = job.executions
    Thread.current[:skiplock_dispatch_data]['exception_executions'] = job.exception_executions
    Thread.current[:skiplock_dispatch_data]['scheduled_at'] = Time.at(timestamp)
    self.new(Thread.current[:skiplock_dispatch_data].slice(*self.column_names).merge(id: job.job_id))
  else
    timestamp = Time.at(timestamp) if timestamp
    Job.create!(id: job.job_id, job_class: job.class.name, queue_name: job.queue_name, locale: job.locale, timezone: job.timezone, priority: job.priority, executions: job.executions, data: { 'arguments' => job.arguments }, scheduled_at: timestamp)
  end
end