Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Attribute Summary collapse
-
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
-
#batches ⇒ Object
Returns the value of attribute batches.
-
#default_priority ⇒ Object
Returns the value of attribute default_priority.
Instance Method Summary collapse
- #check_priorities(min_priority, max_priority) ⇒ Object
- #check_queue(queue) ⇒ Object
-
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue).
- #in_delayed_job=(val) ⇒ Object
- #in_delayed_job? ⇒ Boolean
-
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time.
- #processes_locked_locally(name: nil) ⇒ Object
- #unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
- #unlock_orphaned_prefetched_jobs ⇒ Object
Instance Attribute Details
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
25 26 27 |
# File 'lib/delayed/backend/base.rb', line 25 def batch_enqueue_args @batch_enqueue_args end |
#batches ⇒ Object
Returns the value of attribute batches.
25 26 27 |
# File 'lib/delayed/backend/base.rb', line 25 def batches @batches end |
#default_priority ⇒ Object
Returns the value of attribute default_priority.
25 26 27 |
# File 'lib/delayed/backend/base.rb', line 25 def default_priority @default_priority end |
Instance Method Details
#check_priorities(min_priority, max_priority) ⇒ Object
126 127 128 129 130 131 132 133 |
# File 'lib/delayed/backend/base.rb', line 126 def check_priorities(min_priority, max_priority) if min_priority && min_priority < Delayed::MIN_PRIORITY raise ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}" end if max_priority && max_priority > Delayed::MAX_PRIORITY # rubocop:disable Style/GuardClause raise ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}" end end |
#check_queue(queue) ⇒ Object
122 123 124 |
# File 'lib/delayed/backend/base.rb', line 122 def check_queue(queue) raise(ArgumentError, "queue name can't be blank") if queue.blank? end |
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
138 139 140 |
# File 'lib/delayed/backend/base.rb', line 138 def db_time_now Time.now.utc end |
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/delayed/backend/base.rb', line 32 def enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) unless object.respond_to?(:perform) raise ArgumentError, "Cannot enqueue items which do not respond to perform" end strand ||= singleton if Settings.infer_strand_from_singleton kwargs = Settings..merge(kwargs) kwargs[:payload_object] = object kwargs[:priority] = priority kwargs[:run_at] = run_at if run_at kwargs[:strand] = strand kwargs[:max_attempts] = max_attempts if defined?(Marginalia) && Marginalia::Comment.components kwargs[:source] = Marginalia::Comment.construct_comment end kwargs[:expires_at] = expires_at kwargs[:queue] = queue kwargs[:singleton] = singleton raise ArgumentError, "Only one of strand or n_strand can be used" if strand && n_strand # If two parameters are given to n_strand, the first param is used # as the strand name for looking up the Setting, while the second # param is appended to make a unique set of strands. # # For instance, you can pass ["my_job_type", # root_account.global_id] # to get a set of n strands per root account, and you can apply the # same default to all. if n_strand strand_name, ext = n_strand if ext full_strand_name = "#{strand_name}/#{ext}" num_strands = Delayed::Settings.num_strands.call(full_strand_name) else full_strand_name = strand_name end num_strands ||= Delayed::Settings.num_strands.call(strand_name) num_strands = num_strands ? num_strands.to_i : 1 kwargs.merge!((full_strand_name, num_strands)) end if singleton job = create(**kwargs) elsif batches && strand.nil? && run_at.nil? batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args) batches[batch_enqueue_args] << kwargs return true else raise ArgumentError, "on_conflict can only be provided with singleton" if kwargs[:on_conflict] job = create(**kwargs) end JobTracking.job_created(job) job end |
#in_delayed_job=(val) ⇒ Object
118 119 120 |
# File 'lib/delayed/backend/base.rb', line 118 def in_delayed_job=(val) Thread.current[:in_delayed_job] = val end |
#in_delayed_job? ⇒ Boolean
114 115 116 |
# File 'lib/delayed/backend/base.rb', line 114 def in_delayed_job? !!Thread.current[:in_delayed_job] end |
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time
108 109 110 111 112 |
# File 'lib/delayed/backend/base.rb', line 108 def (strand_name, num_strands) strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1 strand_name += ":#{strand_num}" if strand_num > 1 { strand: strand_name } end |
#processes_locked_locally(name: nil) ⇒ Object
142 143 144 145 146 147 148 |
# File 'lib/delayed/backend/base.rb', line 142 def processes_locked_locally(name: nil) name ||= Socket.gethostname rescue x local_jobs = running_jobs.select do |job| job.locked_by.start_with?("#{name}:") end local_jobs.map { |job| job.locked_by.split(":").last.to_i } end |
#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/delayed/backend/base.rb', line 160 def unlock_orphaned_jobs(pid = nil, name = nil) begin name ||= Socket.gethostname rescue return 0 end pid_regex = pid || '(\d+)' regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$") unlocked_jobs = 0 running = false if pid running_jobs.each do |job| next unless job.locked_by =~ regex unless pid job_pid = $1.to_i running = Process.kill(0, job_pid) rescue false end unless running unlocked_jobs += 1 job.reschedule("process died") end end unlocked_jobs end |
#unlock_orphaned_prefetched_jobs ⇒ Object
150 151 152 153 154 155 156 157 158 |
# File 'lib/delayed/backend/base.rb', line 150 def unlock_orphaned_prefetched_jobs horizon = db_time_now - Settings.parent_process[:prefetched_jobs_timeout] * 4 orphaned_jobs = running_jobs.select do |job| job.locked_by.start_with?("prefetch:") && job.locked_at < horizon end return 0 if orphaned_jobs.empty? unlock(orphaned_jobs) end |