Class: Delayed::Backend::ActiveRecord::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Base
Defined in:
lib/delayed/backend/active_record.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.after_forkObject



55
56
57
# File 'lib/delayed/backend/active_record.rb', line 55

def self.after_fork
  ::ActiveRecord::Base.establish_connection
end

.before_forkObject



51
52
53
# File 'lib/delayed/backend/active_record.rb', line 51

def self.before_fork
  ::ActiveRecord::Base.clear_all_connections!
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



60
61
62
# File 'lib/delayed/backend/active_record.rb', line 60

def self.clear_locks!(worker_name)
  where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil)
end

.db_time_nowObject

Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



143
144
145
146
147
148
149
150
151
# File 'lib/delayed/backend/active_record.rb', line 143

def self.db_time_now
  if Time.zone
    Time.zone.now
  elsif ::ActiveRecord::Base.default_timezone == :utc
    Time.now.utc
  else
    Time.now
  end
end

.ready_to_run(worker_name, max_run_time) ⇒ Object



47
48
49
# File 'lib/delayed/backend/active_record.rb', line 47

def self.ready_to_run(worker_name, max_run_time)
  where("(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL", db_time_now, db_time_now - max_run_time, worker_name)
end

.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object

rubocop:disable CyclomaticComplexity



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/delayed/backend/active_record.rb', line 64

def self.reserve(worker, max_run_time = Worker.max_run_time) # rubocop:disable CyclomaticComplexity
  # scope to filter to records that are "ready to run"
  ready_scope = ready_to_run(worker.name, max_run_time)

  # scope to filter to the single next eligible job
  ready_scope = ready_scope.where("priority >= ?", Worker.min_priority) if Worker.min_priority
  ready_scope = ready_scope.where("priority <= ?", Worker.max_priority) if Worker.max_priority
  ready_scope = ready_scope.where(queue: Worker.queues) if Worker.queues.any?
  ready_scope = ready_scope.by_priority

  reserve_with_scope(ready_scope, worker, db_time_now)
end

.reserve_with_scope(ready_scope, worker, now) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/delayed/backend/active_record.rb', line 77

def self.reserve_with_scope(ready_scope, worker, now)
  case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
  # Optimizations for faster lookups on some common databases
  when :optimized_sql
    reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
  # Slower but in some cases more unproblematic strategy to lookup records
  # See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
  when :default_sql
    reserve_with_scope_using_default_sql(ready_scope, worker, now)
  end
end

.reserve_with_scope_using_default_sql(ready_scope, worker, now) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/delayed/backend/active_record.rb', line 132

def self.reserve_with_scope_using_default_sql(ready_scope, worker, now)
  # This is our old fashion, tried and true, but slower lookup
  ready_scope.limit(worker.read_ahead).detect do |job|
    count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name)
    count == 1 && job.reload
  end
end

.reserve_with_scope_using_optimized_sql(ready_scope, worker, now) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/delayed/backend/active_record.rb', line 89

def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
  case connection.adapter_name
  when "PostgreSQL"
    # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
    # This locks the single record 'FOR UPDATE' in the subquery
    # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
    # Note: active_record would attempt to generate UPDATE...LIMIT like
    # SQL for Postgres if we use a .limit() filter, but it would not
    # use 'FOR UPDATE' and we would have many locking conflicts
    quoted_table_name = connection.quote_table_name(table_name)
    subquery_sql      = ready_scope.limit(1).lock(true).select("id").to_sql
    reserved          = find_by_sql(["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql}) RETURNING *", now, worker.name])
    reserved[0]
  when "MySQL", "Mysql2"
    # Removing the millisecond precision from now(time object)
    # MySQL 5.6.4 onwards millisecond precision exists, but the
    # datetime object created doesn't have precision, so discarded
    # while updating. But during the where clause, for mysql(>=5.6.4),
    # it queries with precision as well. So removing the precision
    now = now.change(usec: 0)
    # This works on MySQL and possibly some other DBs that support
    # UPDATE...LIMIT. It uses separate queries to lock and return the job
    count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
    return nil if count == 0
    where(locked_at: now, locked_by: worker.name, failed_at: nil).first
  when "MSSQL", "Teradata"
    # The MSSQL driver doesn't generate a limit clause when update_all
    # is called directly
    subsubquery_sql = ready_scope.limit(1).to_sql
    # select("id") doesn't generate a subquery, so force a subquery
    subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x"
    quoted_table_name = connection.quote_table_name(table_name)
    sql = ["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})", now, worker.name]
    count = connection.execute(sanitize_sql(sql))
    return nil if count == 0
    # MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row
    where(locked_at: now, locked_by: worker.name, failed_at: nil).first
  # Fallback for unknown / other DBMS
  else
    reserve_with_scope_using_default_sql(ready_scope, worker, now)
  end
end

.set_delayed_job_table_nameObject



40
41
42
43
# File 'lib/delayed/backend/active_record.rb', line 40

def self.set_delayed_job_table_name
  delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs"
  self.table_name = delayed_job_table_name
end

Instance Method Details

#reload(*args) ⇒ Object



153
154
155
156
# File 'lib/delayed/backend/active_record.rb', line 153

def reload(*args)
  reset
  super
end