Class: Skiplock::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Skiplock::Job
- Defined in:
- lib/skiplock/job.rb
Class Method Summary collapse
-
.dispatch(connection: ActiveRecord::Base.connection) ⇒ Object
Accept: An active ActiveRecord database connection (eg. ActiveRecord::Base.connection) The connection should be checked out using ActiveRecord::Base.connection_pool.checkout, and be checked in using ActiveRecord::Base.conection_pool.checkin once all of the job dispatches have been completed.
- .enqueue_at(job, timestamp) ⇒ Object
Class Method Details
.dispatch(connection: ActiveRecord::Base.connection) ⇒ Object
Accept: An active ActiveRecord database connection (eg. ActiveRecord::Base.connection)
The connection should be checked out using ActiveRecord::Base.connection_pool.checkout, and be checked
in using ActiveRecord::Base.conection_pool.checkin once all of the job dispatches have been completed.
*** IMPORTANT: This connection cannot be shared with the job's execution
Return: Attributes hash of the Job if it was executed; otherwise returns the next Job’s schedule time in FLOAT
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/skiplock/job.rb', line 11 def self.dispatch(connection: ActiveRecord::Base.connection) connection.exec_query('BEGIN') job = connection.exec_query("SELECT * FROM #{self.table_name} WHERE running = 'f' AND expired_at IS NULL AND finished_at IS NULL ORDER BY scheduled_at ASC NULLS FIRST, priority ASC NULLS LAST, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").first if job && job['scheduled_at'].to_f <= Time.now.to_f # found job ready to perform # update the job to mark it in progress in case database server goes down during job execution connection.exec_query("UPDATE #{self.table_name} SET running = 't' WHERE id = '#{job['id']}'") connection.exec_query('END') # close the transaction commit the state of job in progress executions = (job['executions'] || 0) + 1 exceptions = job['exception_executions'] ? JSON.parse(job['exception_executions']) : {} data = job['data'] ? JSON.parse(job['data']) : {} job_data = job.slice('job_class', 'queue_name', 'locale', 'timezone', 'priority', 'executions').merge('job_id' => job['id'], 'exception_executions' => exceptions, 'enqueued_at' => job['updated_at']).merge(data) Thread.current[:skiplock_dispatch_data] = job_data begin ActiveJob::Base.execute(job_data) rescue Exception => ex end if ex # TODO: report exception exceptions["[#{ex.class.name}]"] = (exceptions["[#{ex.class.name}]"] || 0) + 1 unless exceptions.key?('activejob_retry') if executions >= Settings['max_retries'] || exceptions.key?('activejob_retry') connection.exec_query("UPDATE #{self.table_name} SET running = 'f', executions = #{executions}, exception_executions = '#{connection.quote_string(exceptions.to_json.to_s)}', expired_at = NOW(), updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first else = Time.now + (5 * 2**executions) connection.exec_query("UPDATE #{self.table_name} SET running = 'f', executions = #{executions}, exception_executions = '#{connection.quote_string(exceptions.to_json.to_s)}', scheduled_at = TO_TIMESTAMP(#{.to_f}), updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first end elsif exceptions.key?('activejob_retry') connection.exec_query("UPDATE #{self.table_name} SET running = 'f', executions = #{job_data['executions']}, exception_executions = '#{connection.quote_string(job_data['exception_executions'].to_json.to_s)}', scheduled_at = TO_TIMESTAMP(#{job_data['scheduled_at'].to_f}), updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first elsif job['cron'] data['last_cron_run'] = Time.now.utc.to_s next_cron_at = Cron.next_schedule_at(job['cron']) if next_cron_at connection.exec_query("UPDATE #{self.table_name} SET running = 'f', scheduled_at = TO_TIMESTAMP(#{next_cron_at}), executions = 1, exception_executions = NULL, data = '#{connection.quote_string(data.to_json.to_s)}', updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first else connection.exec_query("DELETE FROM #{self.table_name} WHERE id = '#{job['id']}' RETURNING *").first end elsif Settings['purge_completion'] connection.exec_query("DELETE FROM #{self.table_name} WHERE id = '#{job['id']}' RETURNING *").first else connection.exec_query("UPDATE #{self.table_name} SET running = 'f', executions = #{executions}, exception_executions = NULL, finished_at = NOW(), updated_at = NOW() WHERE id = '#{job['id']}' RETURNING *").first end else connection.exec_query('END') job ? job['scheduled_at'].to_f : Float::INFINITY end ensure Thread.current[:skiplock_dispatch_data] = nil end |
.enqueue_at(job, timestamp) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/skiplock/job.rb', line 59 def self.enqueue_at(job, ) if Thread.current[:skiplock_dispatch_data] job.exception_executions['activejob_retry'] = true Thread.current[:skiplock_dispatch_data]['executions'] = job.executions Thread.current[:skiplock_dispatch_data]['exception_executions'] = job.exception_executions Thread.current[:skiplock_dispatch_data]['scheduled_at'] = Time.at() self.new(Thread.current[:skiplock_dispatch_data].slice(*self.column_names).merge(id: job.job_id)) else = Time.at() if Job.create!(id: job.job_id, job_class: job.class.name, queue_name: job.queue_name, locale: job.locale, timezone: job.timezone, priority: job.priority, executions: job.executions, data: { 'arguments' => job.arguments }, scheduled_at: ) end end |