Module: Delayed::Backend::Base::ClassMethods

Defined in:
lib/delayed/backend/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batch_enqueue_argsObject

Returns the value of attribute batch_enqueue_args.



25
26
27
# File 'lib/delayed/backend/base.rb', line 25

def batch_enqueue_args
  @batch_enqueue_args
end

#batchesObject

Returns the value of attribute batches.



25
26
27
# File 'lib/delayed/backend/base.rb', line 25

def batches
  @batches
end

#default_priorityObject

Returns the value of attribute default_priority.



25
26
27
# File 'lib/delayed/backend/base.rb', line 25

def default_priority
  @default_priority
end

Instance Method Details

#check_priorities(min_priority, max_priority) ⇒ Object



126
127
128
129
130
131
132
133
# File 'lib/delayed/backend/base.rb', line 126

def check_priorities(min_priority, max_priority)
  if min_priority && min_priority < Delayed::MIN_PRIORITY
    raise ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}"
  end
  if max_priority && max_priority > Delayed::MAX_PRIORITY # rubocop:disable Style/GuardClause
    raise ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}"
  end
end

#check_queue(queue) ⇒ Object

Raises:

  • (ArgumentError)


122
123
124
# File 'lib/delayed/backend/base.rb', line 122

def check_queue(queue)
  raise(ArgumentError, "queue name can't be blank") if queue.blank?
end

#db_time_nowObject

Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



138
139
140
# File 'lib/delayed/backend/base.rb', line 138

def db_time_now
  Time.now.utc
end

#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object

Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/delayed/backend/base.rb', line 32

def enqueue(object,
            priority: default_priority,
            run_at: nil,
            expires_at: nil,
            queue: Delayed::Settings.queue,
            strand: nil,
            singleton: nil,
            n_strand: nil,
            max_attempts: Delayed::Settings.max_attempts,
            **kwargs)

  unless object.respond_to?(:perform)
    raise ArgumentError, "Cannot enqueue items which do not respond to perform"
  end

  strand ||= singleton if Settings.infer_strand_from_singleton

  kwargs = Settings.default_job_options.merge(kwargs)
  kwargs[:payload_object] = object
  kwargs[:priority] = priority
  kwargs[:run_at] = run_at if run_at
  kwargs[:strand] = strand
  kwargs[:max_attempts] = max_attempts
  if defined?(Marginalia) && Marginalia::Comment.components
    kwargs[:source] =
      Marginalia::Comment.construct_comment
  end
  kwargs[:expires_at] = expires_at
  kwargs[:queue] = queue
  kwargs[:singleton] = singleton

  raise ArgumentError, "Only one of strand or n_strand can be used" if strand && n_strand

  # If two parameters are given to n_strand, the first param is used
  # as the strand name for looking up the Setting, while the second
  # param is appended to make a unique set of strands.
  #
  # For instance, you can pass ["my_job_type", # root_account.global_id]
  # to get a set of n strands per root account, and you can apply the
  # same default to all.
  if n_strand
    strand_name, ext = n_strand

    if ext
      full_strand_name = "#{strand_name}/#{ext}"
      num_strands = Delayed::Settings.num_strands.call(full_strand_name)
    else
      full_strand_name = strand_name
    end

    num_strands ||= Delayed::Settings.num_strands.call(strand_name)
    num_strands = num_strands ? num_strands.to_i : 1

    kwargs.merge!(n_strand_options(full_strand_name, num_strands))
  end

  if singleton
    job = create(**kwargs)
  elsif batches && strand.nil? && run_at.nil?
    batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args)
    batches[batch_enqueue_args] << kwargs
    return true
  else
    raise ArgumentError, "on_conflict can only be provided with singleton" if kwargs[:on_conflict]

    job = create(**kwargs)
  end

  JobTracking.job_created(job)

  job
end

#in_delayed_job=(val) ⇒ Object



118
119
120
# File 'lib/delayed/backend/base.rb', line 118

def in_delayed_job=(val)
  Thread.current[:in_delayed_job] = val
end

#in_delayed_job?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/delayed/backend/base.rb', line 114

def in_delayed_job?
  !!Thread.current[:in_delayed_job]
end

#n_strand_options(strand_name, num_strands) ⇒ Object

by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time



108
109
110
111
112
# File 'lib/delayed/backend/base.rb', line 108

def n_strand_options(strand_name, num_strands)
  strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1
  strand_name += ":#{strand_num}" if strand_num > 1
  { strand: strand_name }
end

#processes_locked_locally(name: nil) ⇒ Object



142
143
144
145
146
147
148
# File 'lib/delayed/backend/base.rb', line 142

def processes_locked_locally(name: nil)
  name ||= Socket.gethostname rescue x
  local_jobs = running_jobs.select do |job|
    job.locked_by.start_with?("#{name}:")
  end
  local_jobs.map { |job| job.locked_by.split(":").last.to_i }
end

#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/delayed/backend/base.rb', line 160

def unlock_orphaned_jobs(pid = nil, name = nil)
  begin
    name ||= Socket.gethostname
  rescue
    return 0
  end
  pid_regex = pid || '(\d+)'
  regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$")
  unlocked_jobs = 0
  running = false if pid
  running_jobs.each do |job|
    next unless job.locked_by =~ regex

    unless pid
      job_pid = $1.to_i
      running = Process.kill(0, job_pid) rescue false
    end
    unless running
      unlocked_jobs += 1
      job.reschedule("process died")
    end
  end
  unlocked_jobs
end

#unlock_orphaned_prefetched_jobsObject



150
151
152
153
154
155
156
157
158
# File 'lib/delayed/backend/base.rb', line 150

def unlock_orphaned_prefetched_jobs
  horizon = db_time_now - Settings.parent_process[:prefetched_jobs_timeout] * 4
  orphaned_jobs = running_jobs.select do |job|
    job.locked_by.start_with?("prefetch:") && job.locked_at < horizon
  end
  return 0 if orphaned_jobs.empty?

  unlock(orphaned_jobs)
end