Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Attribute Summary collapse
-
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
-
#batches ⇒ Object
Returns the value of attribute batches.
-
#default_priority ⇒ Object
Returns the value of attribute default_priority.
Instance Method Summary collapse
- #check_priorities(min_priority, max_priority) ⇒ Object
- #check_queue(queue) ⇒ Object
-
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue).
- #in_delayed_job=(val) ⇒ Object
- #in_delayed_job? ⇒ Boolean
-
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time.
- #processes_locked_locally(name: nil) ⇒ Object
- #unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
- #unlock_orphaned_prefetched_jobs ⇒ Object
Instance Attribute Details
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
25 26 27 |
# File 'lib/delayed/backend/base.rb', line 25 def batch_enqueue_args @batch_enqueue_args end |
#batches ⇒ Object
Returns the value of attribute batches.
25 26 27 |
# File 'lib/delayed/backend/base.rb', line 25 def batches @batches end |
#default_priority ⇒ Object
Returns the value of attribute default_priority.
25 26 27 |
# File 'lib/delayed/backend/base.rb', line 25 def default_priority @default_priority end |
Instance Method Details
#check_priorities(min_priority, max_priority) ⇒ Object
126 127 128 129 130 131 132 133 |
# File 'lib/delayed/backend/base.rb', line 126 def check_priorities(min_priority, max_priority) if min_priority && min_priority < Delayed::MIN_PRIORITY raise ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}" end if max_priority && max_priority > Delayed::MAX_PRIORITY # rubocop:disable Style/GuardClause raise ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}" end end |
#check_queue(queue) ⇒ Object
122 123 124 |
# File 'lib/delayed/backend/base.rb', line 122 def check_queue(queue) raise(ArgumentError, "queue name can't be blank") if queue.blank? end |
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
138 139 140 |
# File 'lib/delayed/backend/base.rb', line 138 def db_time_now Time.now.utc end |
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/delayed/backend/base.rb', line 32 def enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) unless object.respond_to?(:perform) raise ArgumentError, "Cannot enqueue items which do not respond to perform" end strand ||= singleton if Settings.infer_strand_from_singleton kwargs = Settings..merge(kwargs) kwargs[:payload_object] = object kwargs[:priority] = priority kwargs[:run_at] = run_at if run_at kwargs[:strand] = strand kwargs[:max_attempts] = max_attempts if defined?(Marginalia) && Marginalia::Comment.components kwargs[:source] = Marginalia::Comment.construct_comment end kwargs[:expires_at] = expires_at kwargs[:queue] = queue kwargs[:singleton] = singleton raise ArgumentError, "Only one of strand or n_strand can be used" if strand && n_strand # If two parameters are given to n_strand, the first param is used # as the strand name for looking up the Setting, while the second # param is appended to make a unique set of strands. # # For instance, you can pass ["my_job_type", # root_account.global_id] # to get a set of n strands per root account, and you can apply the # same default to all. if n_strand strand_name, ext = n_strand if ext full_strand_name = "#{strand_name}/#{ext}" num_strands = Delayed::Settings.num_strands.call(full_strand_name) else full_strand_name = strand_name end num_strands ||= Delayed::Settings.num_strands.call(strand_name) num_strands = num_strands ? num_strands.to_i : 1 kwargs.merge!((full_strand_name, num_strands)) end if singleton job = create(**kwargs) elsif batches && strand.nil? && run_at.nil? batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args) batches[batch_enqueue_args] << kwargs return true else raise ArgumentError, "on_conflict can only be provided with singleton" if kwargs[:on_conflict] job = create(**kwargs) end JobTracking.job_created(job) job end |
#in_delayed_job=(val) ⇒ Object
118 119 120 |
# File 'lib/delayed/backend/base.rb', line 118 def in_delayed_job=(val) Thread.current[:in_delayed_job] = val end |
#in_delayed_job? ⇒ Boolean
114 115 116 |
# File 'lib/delayed/backend/base.rb', line 114 def in_delayed_job? !!Thread.current[:in_delayed_job] end |
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time
108 109 110 111 112 |
# File 'lib/delayed/backend/base.rb', line 108 def (strand_name, num_strands) strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1 strand_name += ":#{strand_num}" if strand_num > 1 { strand: strand_name } end |
#processes_locked_locally(name: nil) ⇒ Object
142 143 144 145 146 147 148 |
# File 'lib/delayed/backend/base.rb', line 142 def processes_locked_locally(name: nil) name ||= Socket.gethostname rescue x local_jobs = running_jobs.select do |job| job.locked_by.start_with?("#{name}:") end local_jobs.map { |job| job.locked_by.split(":").last.to_i } end |
#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/delayed/backend/base.rb', line 160 def unlock_orphaned_jobs(pid = nil, name = nil) begin name ||= Socket.gethostname rescue return 0 end pid_regex = pid || '(\d+)' regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$") unlocked_jobs = 0 escaped_name = name.gsub("\\", "\\\\") .gsub("%", "\\%") .gsub("_", "\\_") locked_by_like = "#{escaped_name}:%" running = false if pid jobs = running_jobs.limit(100) jobs = pid ? jobs.where(locked_by: "#{name}:#{pid}") : jobs.where("locked_by LIKE ?", locked_by_like) ignores = [] loop do batch_scope = ignores.empty? ? jobs : jobs.where.not(id: ignores) batch = batch_scope.to_a break if batch.empty? batch.each do |job| unless job.locked_by =~ regex ignores << job.id next end unless pid job_pid = $1.to_i running = Process.kill(0, job_pid) rescue false end if running ignores << job.id else unlocked_jobs += 1 job.reschedule("process died") end end end unlocked_jobs end |
#unlock_orphaned_prefetched_jobs ⇒ Object
150 151 152 153 154 155 156 157 158 |
# File 'lib/delayed/backend/base.rb', line 150 def unlock_orphaned_prefetched_jobs horizon = db_time_now - (Settings.parent_process[:prefetched_jobs_timeout] * 4) orphaned_jobs = running_jobs.select do |job| job.locked_by.start_with?("prefetch:") && job.locked_at < horizon end return 0 if orphaned_jobs.empty? unlock(orphaned_jobs) end |