Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Attribute Summary collapse
-
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
-
#batches ⇒ Object
Returns the value of attribute batches.
-
#default_priority ⇒ Object
Returns the value of attribute default_priority.
Instance Method Summary collapse
- #check_priorities(min_priority, max_priority) ⇒ Object
- #check_queue(queue) ⇒ Object
-
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
#enqueue(*args) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, :priority => 0, :run_at => time, :queue => queue).
- #in_delayed_job=(val) ⇒ Object
- #in_delayed_job? ⇒ Boolean
-
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time.
- #unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
- #unlock_orphaned_prefetched_jobs ⇒ Object
Instance Attribute Details
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
24 25 26 |
# File 'lib/delayed/backend/base.rb', line 24 def batch_enqueue_args @batch_enqueue_args end |
#batches ⇒ Object
Returns the value of attribute batches.
23 24 25 |
# File 'lib/delayed/backend/base.rb', line 23 def batches @batches end |
#default_priority ⇒ Object
Returns the value of attribute default_priority.
25 26 27 |
# File 'lib/delayed/backend/base.rb', line 25 def default_priority @default_priority end |
Instance Method Details
#check_priorities(min_priority, max_priority) ⇒ Object
105 106 107 108 109 110 111 112 |
# File 'lib/delayed/backend/base.rb', line 105 def check_priorities(min_priority, max_priority) if min_priority && min_priority < Delayed::MIN_PRIORITY raise(ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}") end if max_priority && max_priority > Delayed::MAX_PRIORITY raise(ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}") end end |
#check_queue(queue) ⇒ Object
101 102 103 |
# File 'lib/delayed/backend/base.rb', line 101 def check_queue(queue) raise(ArgumentError, "queue name can't be blank") if queue.blank? end |
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
117 118 119 |
# File 'lib/delayed/backend/base.rb', line 117 def db_time_now Time.now.utc end |
#enqueue(*args) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, :priority => 0, :run_at => time, :queue => queue)
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/delayed/backend/base.rb', line 32 def enqueue(*args) object = args.shift unless object.respond_to?(:perform) raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end = Settings..merge(args.first || {}) [:priority] ||= self.default_priority [:payload_object] = object [:queue] = Delayed::Settings.queue unless .key?(:queue) [:max_attempts] ||= Delayed::Settings.max_attempts [:source] = Marginalia::Comment.construct_comment if defined?(Marginalia) && Marginalia::Comment.components # If two parameters are given to n_strand, the first param is used # as the strand name for looking up the Setting, while the second # param is appended to make a unique set of strands. # # For instance, you can pass ["my_job_type", # root_account.global_id] # to get a set of n strands per root account, and you can apply the # same default to all. if [:n_strand] strand_name, ext = .delete(:n_strand) if ext full_strand_name = "#{strand_name}/#{ext}" num_strands = Delayed::Settings.num_strands.call(full_strand_name) else full_strand_name = strand_name end num_strands ||= Delayed::Settings.num_strands.call(strand_name) num_strands = num_strands ? num_strands.to_i : 1 .merge!((full_strand_name, num_strands)) end if [:singleton] [:strand] = .delete :singleton job = self.create_singleton() elsif batches && .slice(:strand, :run_at).empty? batch_enqueue_args = .slice(*self.batch_enqueue_args) batches[batch_enqueue_args] << return true else job = self.create() end JobTracking.job_created(job) job end |
#in_delayed_job=(val) ⇒ Object
97 98 99 |
# File 'lib/delayed/backend/base.rb', line 97 def in_delayed_job=(val) Thread.current[:in_delayed_job] = val end |
#in_delayed_job? ⇒ Boolean
93 94 95 |
# File 'lib/delayed/backend/base.rb', line 93 def in_delayed_job? !!Thread.current[:in_delayed_job] end |
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time
87 88 89 90 91 |
# File 'lib/delayed/backend/base.rb', line 87 def (strand_name, num_strands) strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1 strand_name += ":#{strand_num}" if strand_num > 1 {:strand => strand_name} end |
#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/delayed/backend/base.rb', line 128 def unlock_orphaned_jobs(pid = nil, name = nil) begin name ||= Socket.gethostname rescue return 0 end pid_regex = pid || '(\d+)' regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$") unlocked_jobs = 0 running = false if pid self.running_jobs.each do |job| next unless job.locked_by =~ regex unless pid job_pid = $1.to_i running = Process.kill(0, job_pid) rescue false end if !running unlocked_jobs += 1 job.reschedule("process died") end end unlocked_jobs end |
#unlock_orphaned_prefetched_jobs ⇒ Object
121 122 123 124 125 126 |
# File 'lib/delayed/backend/base.rb', line 121 def unlock_orphaned_prefetched_jobs horizon = db_time_now - Settings.parent_process[:prefetched_jobs_timeout] * 4 orphaned_jobs = running_jobs.select { |job| job.locked_by.start_with?('prefetch:') && job.locked_at < horizon } return 0 if orphaned_jobs.empty? unlock(orphaned_jobs) end |