Module: Delayed::Backend::Base::ClassMethods

Defined in:
lib/delayed/backend/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batch_enqueue_argsObject

Returns the value of attribute batch_enqueue_args.



24
25
26
# File 'lib/delayed/backend/base.rb', line 24

def batch_enqueue_args
  @batch_enqueue_args
end

#batchesObject

Returns the value of attribute batches.



23
24
25
# File 'lib/delayed/backend/base.rb', line 23

def batches
  @batches
end

#default_priorityObject

Returns the value of attribute default_priority.



25
26
27
# File 'lib/delayed/backend/base.rb', line 25

def default_priority
  @default_priority
end

Instance Method Details

#check_priorities(min_priority, max_priority) ⇒ Object



105
106
107
108
109
110
111
112
# File 'lib/delayed/backend/base.rb', line 105

def check_priorities(min_priority, max_priority)
  if min_priority && min_priority < Delayed::MIN_PRIORITY
    raise(ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}")
  end
  if max_priority && max_priority > Delayed::MAX_PRIORITY
    raise(ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}")
  end
end

#check_queue(queue) ⇒ Object

Raises:

  • (ArgumentError)


101
102
103
# File 'lib/delayed/backend/base.rb', line 101

def check_queue(queue)
  raise(ArgumentError, "queue name can't be blank") if queue.blank?
end

#db_time_nowObject

Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



117
118
119
# File 'lib/delayed/backend/base.rb', line 117

def db_time_now
  Time.now.utc
end

#enqueue(*args) ⇒ Object

Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, :priority => 0, :run_at => time, :queue => queue)



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/delayed/backend/base.rb', line 32

def enqueue(*args)
  object = args.shift
  unless object.respond_to?(:perform)
    raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  end

  options = Settings.default_job_options.merge(args.first || {})
  options[:priority] ||= self.default_priority
  options[:payload_object] = object
  options[:queue] = Delayed::Settings.queue unless options.key?(:queue)
  options[:max_attempts] ||= Delayed::Settings.max_attempts
  options[:source] = Marginalia::Comment.construct_comment if defined?(Marginalia) && Marginalia::Comment.components

  # If two parameters are given to n_strand, the first param is used
  # as the strand name for looking up the Setting, while the second
  # param is appended to make a unique set of strands.
  #
  # For instance, you can pass ["my_job_type", # root_account.global_id]
  # to get a set of n strands per root account, and you can apply the
  # same default to all.
  if options[:n_strand]
    strand_name, ext = options.delete(:n_strand)

    if ext
      full_strand_name = "#{strand_name}/#{ext}"
      num_strands = Delayed::Settings.num_strands.call(full_strand_name)
    else
      full_strand_name = strand_name
    end

    num_strands ||= Delayed::Settings.num_strands.call(strand_name)
    num_strands = num_strands ? num_strands.to_i : 1

    options.merge!(n_strand_options(full_strand_name, num_strands))
  end

  if options[:singleton]
    options[:strand] = options.delete :singleton
    job = self.create_singleton(options)
  elsif batches && options.slice(:strand, :run_at).empty?
    batch_enqueue_args = options.slice(*self.batch_enqueue_args)
    batches[batch_enqueue_args] << options
    return true
  else
    job = self.create(options)
  end

  JobTracking.job_created(job)

  job
end

#in_delayed_job=(val) ⇒ Object



97
98
99
# File 'lib/delayed/backend/base.rb', line 97

def in_delayed_job=(val)
  Thread.current[:in_delayed_job] = val
end

#in_delayed_job?Boolean

Returns:

  • (Boolean)


93
94
95
# File 'lib/delayed/backend/base.rb', line 93

def in_delayed_job?
  !!Thread.current[:in_delayed_job]
end

#n_strand_options(strand_name, num_strands) ⇒ Object

by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time



87
88
89
90
91
# File 'lib/delayed/backend/base.rb', line 87

def n_strand_options(strand_name, num_strands)
  strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1
  strand_name += ":#{strand_num}" if strand_num > 1
  {:strand => strand_name}
end

#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/delayed/backend/base.rb', line 121

def unlock_orphaned_jobs(pid = nil, name = nil)
  begin
    name ||= Socket.gethostname
  rescue
    return 0
  end
  pid_regex = pid || '(\d+)'
  regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$")
  unlocked_jobs = 0
  running = false if pid
  self.running_jobs.each do |job|
    next unless job.locked_by =~ regex
    unless pid
      job_pid = $1.to_i
      running = Process.kill(0, job_pid) rescue false
    end
    if !running
      unlocked_jobs += 1
      job.reschedule("process died")
    end
  end
  unlocked_jobs
end