Module: Delayed::Backend::Base::ClassMethods

Defined in:
lib/delayed/backend/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batch_enqueue_argsObject

Returns the value of attribute batch_enqueue_args.



26
27
28
# File 'lib/delayed/backend/base.rb', line 26

def batch_enqueue_args
  @batch_enqueue_args
end

#batchesObject

Returns the value of attribute batches.



25
26
27
# File 'lib/delayed/backend/base.rb', line 25

def batches
  @batches
end

#default_priorityObject

Returns the value of attribute default_priority.



27
28
29
# File 'lib/delayed/backend/base.rb', line 27

def default_priority
  @default_priority
end

Instance Method Details

#check_priorities(min_priority, max_priority) ⇒ Object



123
124
125
126
127
128
129
130
# File 'lib/delayed/backend/base.rb', line 123

def check_priorities(min_priority, max_priority)
  if min_priority && min_priority < Delayed::MIN_PRIORITY
    raise(ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}")
  end
  if max_priority && max_priority > Delayed::MAX_PRIORITY
    raise(ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}")
  end
end

#check_queue(queue) ⇒ Object

Raises:

  • (ArgumentError)


119
120
121
# File 'lib/delayed/backend/base.rb', line 119

def check_queue(queue)
  raise(ArgumentError, "queue name can't be blank") if queue.blank?
end

#db_time_nowObject

Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



135
136
137
# File 'lib/delayed/backend/base.rb', line 135

def db_time_now
  Time.now.utc
end

#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object

Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/delayed/backend/base.rb', line 34

def enqueue(object,
  priority: default_priority,
  run_at: nil,
  expires_at: nil,
  queue: Delayed::Settings.queue,
  strand: nil,
  singleton: nil,
  n_strand: nil,
  max_attempts: Delayed::Settings.max_attempts,
  **kwargs)

  unless object.respond_to?(:perform)
    raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  end

  kwargs = Settings.default_job_options.merge(kwargs)
  kwargs[:payload_object] = object
  kwargs[:priority] = priority
  kwargs[:run_at] = run_at if run_at
  kwargs[:strand] = strand
  kwargs[:max_attempts] = max_attempts
  kwargs[:source] = Marginalia::Comment.construct_comment if defined?(Marginalia) && Marginalia::Comment.components
  kwargs[:expires_at] = expires_at
  kwargs[:queue] = queue

  # If two parameters are given to n_strand, the first param is used
  # as the strand name for looking up the Setting, while the second
  # param is appended to make a unique set of strands.
  #
  # For instance, you can pass ["my_job_type", # root_account.global_id]
  # to get a set of n strands per root account, and you can apply the
  # same default to all.
  if n_strand
    strand_name, ext = n_strand

    if ext
      full_strand_name = "#{strand_name}/#{ext}"
      num_strands = Delayed::Settings.num_strands.call(full_strand_name)
    else
      full_strand_name = strand_name
    end

    num_strands ||= Delayed::Settings.num_strands.call(strand_name)
    num_strands = num_strands ? num_strands.to_i : 1

    kwargs.merge!(n_strand_options(full_strand_name, num_strands))
  end

  if singleton
    kwargs[:strand] = singleton
    job = self.create_singleton(**kwargs)
  elsif batches && strand.nil? && run_at.nil?
    batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args)
    batches[batch_enqueue_args] << kwargs
    return true
  else
    if kwargs[:on_conflict].present?
      Delayed::Logging.logger.warn("[DELAYED_JOB] WARNING: providing 'on_conflict' as an option to a non-singleton job will have no effect.  Discarding.")
      kwargs.delete(:on_conflict)
    end
    job = self.create(**kwargs)
  end

  JobTracking.job_created(job)

  job
end

#in_delayed_job=(val) ⇒ Object



115
116
117
# File 'lib/delayed/backend/base.rb', line 115

def in_delayed_job=(val)
  Thread.current[:in_delayed_job] = val
end

#in_delayed_job?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/delayed/backend/base.rb', line 111

def in_delayed_job?
  !!Thread.current[:in_delayed_job]
end

#n_strand_options(strand_name, num_strands) ⇒ Object

by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time



105
106
107
108
109
# File 'lib/delayed/backend/base.rb', line 105

def n_strand_options(strand_name, num_strands)
  strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1
  strand_name += ":#{strand_num}" if strand_num > 1
  { strand: strand_name }
end

#processes_locked_locally(name: nil) ⇒ Object



139
140
141
142
# File 'lib/delayed/backend/base.rb', line 139

def processes_locked_locally(name: nil)
  name ||= Socket.gethostname rescue x
  running_jobs.select{|job| job.locked_by.start_with?("#{name}:")}.map{|job| job.locked_by.split(':').last.to_i}
end

#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/delayed/backend/base.rb', line 151

def unlock_orphaned_jobs(pid = nil, name = nil)
  begin
    name ||= Socket.gethostname
  rescue
    return 0
  end
  pid_regex = pid || '(\d+)'
  regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$")
  unlocked_jobs = 0
  running = false if pid
  self.running_jobs.each do |job|
    next unless job.locked_by =~ regex
    unless pid
      job_pid = $1.to_i
      running = Process.kill(0, job_pid) rescue false
    end
    if !running
      unlocked_jobs += 1
      job.reschedule("process died")
    end
  end
  unlocked_jobs
end

#unlock_orphaned_prefetched_jobsObject



144
145
146
147
148
149
# File 'lib/delayed/backend/base.rb', line 144

def unlock_orphaned_prefetched_jobs
  horizon = db_time_now - Settings.parent_process[:prefetched_jobs_timeout] * 4
  orphaned_jobs = running_jobs.select { |job| job.locked_by.start_with?('prefetch:') && job.locked_at < horizon }
  return 0 if orphaned_jobs.empty?
  unlock(orphaned_jobs)
end