Module: Delayed::Backend::Base::ClassMethods

Defined in:
lib/delayed/backend/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batch_enqueue_argsObject

Returns the value of attribute batch_enqueue_args.



25
26
27
# File 'lib/delayed/backend/base.rb', line 25

def batch_enqueue_args
  @batch_enqueue_args
end

#batchesObject

Returns the value of attribute batches.



25
26
27
# File 'lib/delayed/backend/base.rb', line 25

def batches
  @batches
end

#default_priorityObject

Returns the value of attribute default_priority.



25
26
27
# File 'lib/delayed/backend/base.rb', line 25

def default_priority
  @default_priority
end

Instance Method Details

#check_priorities(min_priority, max_priority) ⇒ Object



126
127
128
129
130
131
132
133
# File 'lib/delayed/backend/base.rb', line 126

def check_priorities(min_priority, max_priority)
  if min_priority && min_priority < Delayed::MIN_PRIORITY
    raise ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}"
  end
  if max_priority && max_priority > Delayed::MAX_PRIORITY # rubocop:disable Style/GuardClause
    raise ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}"
  end
end

#check_queue(queue) ⇒ Object

Raises:

  • (ArgumentError)


122
123
124
# File 'lib/delayed/backend/base.rb', line 122

def check_queue(queue)
  raise(ArgumentError, "queue name can't be blank") if queue.blank?
end

#db_time_nowObject

Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



138
139
140
# File 'lib/delayed/backend/base.rb', line 138

def db_time_now
  Time.now.utc
end

#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object

Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/delayed/backend/base.rb', line 32

def enqueue(object,
            priority: default_priority,
            run_at: nil,
            expires_at: nil,
            queue: Delayed::Settings.queue,
            strand: nil,
            singleton: nil,
            n_strand: nil,
            max_attempts: Delayed::Settings.max_attempts,
            **kwargs)

  unless object.respond_to?(:perform)
    raise ArgumentError, "Cannot enqueue items which do not respond to perform"
  end

  strand ||= singleton if Settings.infer_strand_from_singleton

  kwargs = Settings.default_job_options.merge(kwargs)
  kwargs[:payload_object] = object
  kwargs[:priority] = priority
  kwargs[:run_at] = run_at if run_at
  kwargs[:strand] = strand
  kwargs[:max_attempts] = max_attempts
  if defined?(Marginalia) && Marginalia::Comment.components
    kwargs[:source] =
      Marginalia::Comment.construct_comment
  end
  kwargs[:expires_at] = expires_at
  kwargs[:queue] = queue
  kwargs[:singleton] = singleton

  raise ArgumentError, "Only one of strand or n_strand can be used" if strand && n_strand

  # If two parameters are given to n_strand, the first param is used
  # as the strand name for looking up the Setting, while the second
  # param is appended to make a unique set of strands.
  #
  # For instance, you can pass ["my_job_type", # root_account.global_id]
  # to get a set of n strands per root account, and you can apply the
  # same default to all.
  if n_strand
    strand_name, ext = n_strand

    if ext
      full_strand_name = "#{strand_name}/#{ext}"
      num_strands = Delayed::Settings.num_strands.call(full_strand_name)
    else
      full_strand_name = strand_name
    end

    num_strands ||= Delayed::Settings.num_strands.call(strand_name)
    num_strands = num_strands ? num_strands.to_i : 1

    kwargs.merge!(n_strand_options(full_strand_name, num_strands))
  end

  if singleton
    job = create(**kwargs)
  elsif batches && strand.nil? && run_at.nil?
    batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args)
    batches[batch_enqueue_args] << kwargs
    return true
  else
    raise ArgumentError, "on_conflict can only be provided with singleton" if kwargs[:on_conflict]

    job = create(**kwargs)
  end

  JobTracking.job_created(job)

  job
end

#in_delayed_job=(val) ⇒ Object



118
119
120
# File 'lib/delayed/backend/base.rb', line 118

def in_delayed_job=(val)
  Thread.current[:in_delayed_job] = val
end

#in_delayed_job?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/delayed/backend/base.rb', line 114

def in_delayed_job?
  !!Thread.current[:in_delayed_job]
end

#n_strand_options(strand_name, num_strands) ⇒ Object

by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time



108
109
110
111
112
# File 'lib/delayed/backend/base.rb', line 108

def n_strand_options(strand_name, num_strands)
  strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1
  strand_name += ":#{strand_num}" if strand_num > 1
  { strand: strand_name }
end

#processes_locked_locally(name: nil) ⇒ Object



142
143
144
145
146
147
148
# File 'lib/delayed/backend/base.rb', line 142

def processes_locked_locally(name: nil)
  name ||= Socket.gethostname rescue x
  local_jobs = running_jobs.select do |job|
    job.locked_by.start_with?("#{name}:")
  end
  local_jobs.map { |job| job.locked_by.split(":").last.to_i }
end

#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/delayed/backend/base.rb', line 160

def unlock_orphaned_jobs(pid = nil, name = nil)
  begin
    name ||= Socket.gethostname
  rescue
    return 0
  end
  pid_regex = pid || '(\d+)'
  regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$")
  unlocked_jobs = 0
  escaped_name = name.gsub("\\", "\\\\")
                     .gsub("%", "\\%")
                     .gsub("_", "\\_")
  locked_by_like = "#{escaped_name}:%"
  running = false if pid
  jobs = running_jobs.limit(100)
  jobs = pid ? jobs.where(locked_by: "#{name}:#{pid}") : jobs.where("locked_by LIKE ?", locked_by_like)
  ignores = []
  loop do
    batch_scope = ignores.empty? ? jobs : jobs.where.not(id: ignores)
    batch = batch_scope.to_a
    break if batch.empty?

    batch.each do |job|
      unless job.locked_by =~ regex
        ignores << job.id
        next
      end

      unless pid
        job_pid = $1.to_i
        running = Process.kill(0, job_pid) rescue false
      end

      if running
        ignores << job.id
      else
        unlocked_jobs += 1
        job.reschedule("process died")
      end
    end
  end
  unlocked_jobs
end

#unlock_orphaned_prefetched_jobsObject



150
151
152
153
154
155
156
157
158
# File 'lib/delayed/backend/base.rb', line 150

def unlock_orphaned_prefetched_jobs
  horizon = db_time_now - (Settings.parent_process[:prefetched_jobs_timeout] * 4)
  orphaned_jobs = running_jobs.select do |job|
    job.locked_by.start_with?("prefetch:") && job.locked_at < horizon
  end
  return 0 if orphaned_jobs.empty?

  unlock(orphaned_jobs)
end