Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Attribute Summary collapse
-
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
-
#batches ⇒ Object
Returns the value of attribute batches.
-
#default_priority ⇒ Object
Returns the value of attribute default_priority.
Instance Method Summary collapse
- #check_priorities(min_priority, max_priority) ⇒ Object
- #check_queue(queue) ⇒ Object
-
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue).
- #in_delayed_job=(val) ⇒ Object
- #in_delayed_job? ⇒ Boolean
-
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time.
- #processes_locked_locally(name: nil) ⇒ Object
- #unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
- #unlock_orphaned_prefetched_jobs ⇒ Object
Instance Attribute Details
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def batch_enqueue_args @batch_enqueue_args end |
#batches ⇒ Object
Returns the value of attribute batches.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def batches @batches end |
#default_priority ⇒ Object
Returns the value of attribute default_priority.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def default_priority @default_priority end |
Instance Method Details
#check_priorities(min_priority, max_priority) ⇒ Object
133 134 135 136 137 138 139 140 |
# File 'lib/delayed/backend/base.rb', line 133 def check_priorities(min_priority, max_priority) if min_priority && min_priority < Delayed::MIN_PRIORITY raise ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}" end if max_priority && max_priority > Delayed::MAX_PRIORITY # rubocop:disable Style/GuardClause raise ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}" end end |
#check_queue(queue) ⇒ Object
129 130 131 |
# File 'lib/delayed/backend/base.rb', line 129 def check_queue(queue) raise(ArgumentError, "queue name can't be blank") if queue.blank? end |
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
145 146 147 |
# File 'lib/delayed/backend/base.rb', line 145 def db_time_now Time.now.utc end |
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/delayed/backend/base.rb', line 33 def enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) unless object.respond_to?(:perform) raise ArgumentError, "Cannot enqueue items which do not respond to perform" end strand ||= singleton if Settings.infer_strand_from_singleton kwargs = Settings..merge(kwargs) kwargs[:payload_object] = object kwargs[:priority] = priority kwargs[:run_at] = run_at if run_at kwargs[:strand] = strand kwargs[:max_attempts] = max_attempts if defined?(Marginalia) && Marginalia::Comment.components kwargs[:source] = Marginalia::Comment.construct_comment end kwargs[:expires_at] = expires_at kwargs[:queue] = queue kwargs[:singleton] = singleton raise ArgumentError, "Only one of strand or n_strand can be used" if strand && n_strand # If two parameters are given to n_strand, the first param is used # as the strand name for looking up the Setting, while the second # param is appended to make a unique set of strands. # # For instance, you can pass ["my_job_type", # root_account.global_id] # to get a set of n strands per root account, and you can apply the # same default to all. if n_strand strand_name, ext = n_strand if ext full_strand_name = "#{strand_name}/#{ext}" num_strands = Delayed::Settings.num_strands.call(full_strand_name) else full_strand_name = strand_name end num_strands ||= Delayed::Settings.num_strands.call(strand_name) num_strands = num_strands ? num_strands.to_i : 1 kwargs.merge!((full_strand_name, num_strands)) end job = nil if singleton Delayed::Worker.lifecycle.run_callbacks(:create, kwargs) do job = create(**kwargs) end elsif batches && strand.nil? && run_at.nil? batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args) batches[batch_enqueue_args] << kwargs return true else raise ArgumentError, "on_conflict can only be provided with singleton" if kwargs[:on_conflict] Delayed::Worker.lifecycle.run_callbacks(:create, kwargs) do job = create(**kwargs) end end JobTracking.job_created(job) job end |
#in_delayed_job=(val) ⇒ Object
125 126 127 |
# File 'lib/delayed/backend/base.rb', line 125 def in_delayed_job=(val) Thread.current[:in_delayed_job] = val end |
#in_delayed_job? ⇒ Boolean
121 122 123 |
# File 'lib/delayed/backend/base.rb', line 121 def in_delayed_job? !!Thread.current[:in_delayed_job] end |
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time
115 116 117 118 119 |
# File 'lib/delayed/backend/base.rb', line 115 def (strand_name, num_strands) strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1 strand_name += ":#{strand_num}" if strand_num > 1 { strand: strand_name } end |
#processes_locked_locally(name: nil) ⇒ Object
149 150 151 152 153 154 155 |
# File 'lib/delayed/backend/base.rb', line 149 def processes_locked_locally(name: nil) name ||= Socket.gethostname rescue x local_jobs = running_jobs.select do |job| job.locked_by.start_with?("#{name}:") end local_jobs.map { |job| job.locked_by.split(":").last.to_i } end |
#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/delayed/backend/base.rb', line 167 def unlock_orphaned_jobs(pid = nil, name = nil) begin name ||= Socket.gethostname rescue return 0 end pid_regex = pid || '(\d+)' regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$") unlocked_jobs = 0 escaped_name = name.gsub("\\", "\\\\") .gsub("%", "\\%") .gsub("_", "\\_") locked_by_like = "#{escaped_name}:%" running = false if pid jobs = running_jobs.limit(100) jobs = pid ? jobs.where(locked_by: "#{name}:#{pid}") : jobs.where("locked_by LIKE ?", locked_by_like) ignores = [] loop do batch_scope = ignores.empty? ? jobs : jobs.where.not(id: ignores) # if we don't reload this it's possible to keep getting the # same array each loop even after the jobs have been deleted. batch = batch_scope.reload.to_a break if batch.empty? batch.each do |job| unless job.locked_by =~ regex ignores << job.id next end unless pid job_pid = $1.to_i running = Process.kill(0, job_pid) rescue false end if running ignores << job.id else unlocked_jobs += 1 job.reschedule("process died") end end end unlocked_jobs end |
#unlock_orphaned_prefetched_jobs ⇒ Object
157 158 159 160 161 162 163 164 165 |
# File 'lib/delayed/backend/base.rb', line 157 def unlock_orphaned_prefetched_jobs horizon = db_time_now - (Settings.parent_process[:prefetched_jobs_timeout] * 4) orphaned_jobs = running_jobs.select do |job| job.locked_by.start_with?("prefetch:") && job.locked_at < horizon end return 0 if orphaned_jobs.empty? unlock(orphaned_jobs) end |