Class: Delayed::Backend::ActiveRecord::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Delayed::Backend::ActiveRecord::Job
- Includes:
- Base
- Defined in:
- lib/delayed/backend/active_record.rb
Overview
A job object that is persisted to the database. Contains the work object as a YAML field.
Class Method Summary collapse
- .after_fork ⇒ Object
- .before_fork ⇒ Object
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
-
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
- .ready_to_run(worker_name, max_run_time) ⇒ Object
-
.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
rubocop:disable CyclomaticComplexity.
- .reserve_with_scope(ready_scope, worker, now) ⇒ Object
- .reserve_with_scope_using_default_sql(ready_scope, worker, now) ⇒ Object
- .reserve_with_scope_using_optimized_sql(ready_scope, worker, now) ⇒ Object
- .set_delayed_job_table_name ⇒ Object
Instance Method Summary collapse
Class Method Details
.after_fork ⇒ Object
55 56 57 |
# File 'lib/delayed/backend/active_record.rb', line 55 def self.after_fork ::ActiveRecord::Base.establish_connection end |
.before_fork ⇒ Object
51 52 53 |
# File 'lib/delayed/backend/active_record.rb', line 51 def self.before_fork ::ActiveRecord::Base.clear_all_connections! end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
60 61 62 |
# File 'lib/delayed/backend/active_record.rb', line 60 def self.clear_locks!(worker_name) where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil) end |
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
143 144 145 146 147 148 149 150 151 |
# File 'lib/delayed/backend/active_record.rb', line 143 def self.db_time_now if Time.zone Time.zone.now elsif ::ActiveRecord::Base.default_timezone == :utc Time.now.utc else Time.now end end |
.ready_to_run(worker_name, max_run_time) ⇒ Object
47 48 49 |
# File 'lib/delayed/backend/active_record.rb', line 47 def self.ready_to_run(worker_name, max_run_time) where("(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL", db_time_now, db_time_now - max_run_time, worker_name) end |
.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
rubocop:disable CyclomaticComplexity
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/delayed/backend/active_record.rb', line 64 def self.reserve(worker, max_run_time = Worker.max_run_time) # rubocop:disable CyclomaticComplexity # scope to filter to records that are "ready to run" ready_scope = ready_to_run(worker.name, max_run_time) # scope to filter to the single next eligible job ready_scope = ready_scope.where("priority >= ?", Worker.min_priority) if Worker.min_priority ready_scope = ready_scope.where("priority <= ?", Worker.max_priority) if Worker.max_priority ready_scope = ready_scope.where(queue: Worker.queues) if Worker.queues.any? ready_scope = ready_scope.by_priority reserve_with_scope(ready_scope, worker, db_time_now) end |
.reserve_with_scope(ready_scope, worker, now) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/delayed/backend/active_record.rb', line 77 def self.reserve_with_scope(ready_scope, worker, now) case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy # Optimizations for faster lookups on some common databases when :optimized_sql reserve_with_scope_using_optimized_sql(ready_scope, worker, now) # Slower but in some cases more unproblematic strategy to lookup records # See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details. when :default_sql reserve_with_scope_using_default_sql(ready_scope, worker, now) end end |
.reserve_with_scope_using_default_sql(ready_scope, worker, now) ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/delayed/backend/active_record.rb', line 132 def self.reserve_with_scope_using_default_sql(ready_scope, worker, now) # This is our old fashion, tried and true, but slower lookup ready_scope.limit(worker.read_ahead).detect do |job| count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name) count == 1 && job.reload end end |
.reserve_with_scope_using_optimized_sql(ready_scope, worker, now) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/delayed/backend/active_record.rb', line 89 def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now) case connection.adapter_name when "PostgreSQL" # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT # This locks the single record 'FOR UPDATE' in the subquery # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE # Note: active_record would attempt to generate UPDATE...LIMIT like # SQL for Postgres if we use a .limit() filter, but it would not # use 'FOR UPDATE' and we would have many locking conflicts quoted_table_name = connection.quote_table_name(table_name) subquery_sql = ready_scope.limit(1).lock(true).select("id").to_sql reserved = find_by_sql(["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql}) RETURNING *", now, worker.name]) reserved[0] when "MySQL", "Mysql2" # Removing the millisecond precision from now(time object) # MySQL 5.6.4 onwards millisecond precision exists, but the # datetime object created doesn't have precision, so discarded # while updating. But during the where clause, for mysql(>=5.6.4), # it queries with precision as well. So removing the precision now = now.change(usec: 0) # This works on MySQL and possibly some other DBs that support # UPDATE...LIMIT. It uses separate queries to lock and return the job count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name) return nil if count == 0 where(locked_at: now, locked_by: worker.name, failed_at: nil).first when "MSSQL", "Teradata" # The MSSQL driver doesn't generate a limit clause when update_all # is called directly subsubquery_sql = ready_scope.limit(1).to_sql # select("id") doesn't generate a subquery, so force a subquery subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x" quoted_table_name = connection.quote_table_name(table_name) sql = ["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})", now, worker.name] count = connection.execute(sanitize_sql(sql)) return nil if count == 0 # MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row where(locked_at: now, locked_by: worker.name, failed_at: nil).first # Fallback for unknown / other DBMS else reserve_with_scope_using_default_sql(ready_scope, worker, now) end end |
.set_delayed_job_table_name ⇒ Object
40 41 42 43 |
# File 'lib/delayed/backend/active_record.rb', line 40 def self.set_delayed_job_table_name delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs" self.table_name = delayed_job_table_name end |
Instance Method Details
#reload(*args) ⇒ Object
153 154 155 156 |
# File 'lib/delayed/backend/active_record.rb', line 153 def reload(*args) reset super end |