Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Attribute Summary collapse
-
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
-
#batches ⇒ Object
Returns the value of attribute batches.
-
#default_priority ⇒ Object
Returns the value of attribute default_priority.
Instance Method Summary collapse
- #check_priorities(min_priority, max_priority) ⇒ Object
- #check_queue(queue) ⇒ Object
-
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
#enqueue(object, priority: nil, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue).
- #in_delayed_job=(val) ⇒ Object
- #in_delayed_job? ⇒ Boolean
-
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time.
- #processes_locked_locally(name: nil) ⇒ Object
- #unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
- #unlock_orphaned_prefetched_jobs ⇒ Object
Instance Attribute Details
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def batch_enqueue_args @batch_enqueue_args end |
#batches ⇒ Object
Returns the value of attribute batches.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def batches @batches end |
#default_priority ⇒ Object
Returns the value of attribute default_priority.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def default_priority @default_priority end |
Instance Method Details
#check_priorities(min_priority, max_priority) ⇒ Object
147 148 149 150 151 152 153 154 |
# File 'lib/delayed/backend/base.rb', line 147 def check_priorities(min_priority, max_priority) if min_priority && min_priority < Delayed::MIN_PRIORITY raise ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}" end if max_priority && max_priority > Delayed::MAX_PRIORITY # rubocop:disable Style/GuardClause raise ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}" end end |
#check_queue(queue) ⇒ Object
143 144 145 |
# File 'lib/delayed/backend/base.rb', line 143 def check_queue(queue) raise(ArgumentError, "queue name can't be blank") if queue.blank? end |
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
159 160 161 |
# File 'lib/delayed/backend/base.rb', line 159 def db_time_now Time.now.utc end |
#enqueue(object, priority: nil, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/delayed/backend/base.rb', line 33 def enqueue(object, priority: nil, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) unless object.respond_to?(:perform) raise ArgumentError, "Cannot enqueue items which do not respond to perform" end priority ||= default_priority strand ||= singleton if Settings.infer_strand_from_singleton kwargs = Settings..merge(kwargs) kwargs[:payload_object] = object kwargs[:priority] = priority kwargs[:run_at] = run_at if run_at kwargs[:strand] = strand kwargs[:max_attempts] = max_attempts if defined?(Rails) && Rails.application.respond_to?(:config) && Rails.application.config.respond_to?(:active_record) && Rails.application.config.active_record. kwargs[:source] = ::ActiveRecord::QueryLogs.send(:tag_content, ::Delayed::Job.connection) elsif defined?(Marginalia) && Marginalia::Comment.components kwargs[:source] = Marginalia::Comment.construct_comment end kwargs[:expires_at] = expires_at kwargs[:queue] = queue kwargs[:singleton] = singleton raise ArgumentError, "Only one of strand or n_strand can be used" if strand && n_strand if (strand || n_strand) && run_at && run_at > Job.db_time_now + Settings.stranded_run_at_grace_period raise ArgumentError, "Do not use run_at with strand; you may inadvertently clog the strand" end # If two parameters are given to n_strand, the first param is used # as the strand name for looking up the Setting, while the second # param is appended to make a unique set of strands. # # For instance, you can pass ["my_job_type", # root_account.global_id] # to get a set of n strands per root account, and you can apply the # same default to all. if n_strand strand_name, ext = n_strand if ext full_strand_name = "#{strand_name}/#{ext}" num_strands = Delayed::Settings.num_strands.call(full_strand_name) else full_strand_name = strand_name end num_strands ||= Delayed::Settings.num_strands.call(strand_name) num_strands = num_strands ? num_strands.to_i : 1 kwargs.merge!((full_strand_name, num_strands)) end job = nil if singleton Delayed::Worker.lifecycle.run_callbacks(:create, kwargs) do job = create(**kwargs) end elsif batches && strand.nil? && run_at.nil? batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args) batches[batch_enqueue_args] << kwargs return true else raise ArgumentError, "on_conflict can only be provided with singleton" if kwargs[:on_conflict] Delayed::Worker.lifecycle.run_callbacks(:create, kwargs) do job = create(**kwargs) end end if job.id logger.info("Created #{Delayed::Logging.log_job(job, :long)}") JobTracking.job_created(job) else logger.info("Dropped #{Delayed::Logging.log_job(job, :long)}") end job end |
#in_delayed_job=(val) ⇒ Object
139 140 141 |
# File 'lib/delayed/backend/base.rb', line 139 def in_delayed_job=(val) Thread.current[:in_delayed_job] = val end |
#in_delayed_job? ⇒ Boolean
135 136 137 |
# File 'lib/delayed/backend/base.rb', line 135 def in_delayed_job? !!Thread.current[:in_delayed_job] end |
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time
129 130 131 132 133 |
# File 'lib/delayed/backend/base.rb', line 129 def (strand_name, num_strands) strand_num = (num_strands > 1) ? rand(num_strands) + 1 : 1 strand_name += ":#{strand_num}" if strand_num > 1 { strand: strand_name } end |
#processes_locked_locally(name: nil) ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/delayed/backend/base.rb', line 163 def processes_locked_locally(name: nil) name ||= Socket.gethostname rescue x local_jobs = running_jobs.select do |job| job.locked_by.start_with?("#{name}:") end local_jobs.map { |job| job.locked_by.split(":").last.to_i } end |
#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/delayed/backend/base.rb', line 181 def unlock_orphaned_jobs(pid = nil, name = nil) begin name ||= Socket.gethostname rescue return 0 end pid_regex = pid || '(\d+)' regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$") unlocked_jobs = 0 escaped_name = name.gsub("\\", "\\\\") .gsub("%", "\\%") .gsub("_", "\\_") locked_by_like = "#{escaped_name}:%" running = false if pid jobs = running_jobs.limit(100) jobs = pid ? jobs.where(locked_by: "#{name}:#{pid}") : jobs.where("locked_by LIKE ?", locked_by_like) ignores = [] loop do batch_scope = ignores.empty? ? jobs : jobs.where.not(id: ignores) # if we don't reload this it's possible to keep getting the # same array each loop even after the jobs have been deleted. batch = batch_scope.reload.to_a break if batch.empty? batch.each do |job| unless job.locked_by =~ regex ignores << job.id next end unless pid job_pid = $1.to_i running = Delayed::Util.process_running?(job_pid) end if running ignores << job.id else unlocked_jobs += 1 job.reschedule("process died") end end end unlocked_jobs end |
#unlock_orphaned_prefetched_jobs ⇒ Object
171 172 173 174 175 176 177 178 179 |
# File 'lib/delayed/backend/base.rb', line 171 def unlock_orphaned_prefetched_jobs horizon = db_time_now - (Settings.parent_process[:prefetched_jobs_timeout] * 4) orphaned_jobs = running_jobs.select do |job| job.locked_by.start_with?("prefetch:") && job.locked_at < horizon end return 0 if orphaned_jobs.empty? unlock(orphaned_jobs) end |