Class: Sidekiq::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Util
Defined in:
lib/sidekiq/scheduler.rb

Constant Summary collapse

REGISTERED_JOBS_THRESHOLD_IN_SECONDS =
24 * 60 * 60

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.dynamicObject

Set to update the schedule in runtime in a given time period.



25
26
27
# File 'lib/sidekiq/scheduler.rb', line 25

def dynamic
  @dynamic
end

.enabledObject

Set to enable or disable the scheduler.



22
23
24
# File 'lib/sidekiq/scheduler.rb', line 22

def enabled
  @enabled
end

.listened_queues_onlyObject

Set to schedule jobs only when will be pushed to queues listened by sidekiq



28
29
30
# File 'lib/sidekiq/scheduler.rb', line 28

def listened_queues_only
  @listened_queues_only
end

Class Method Details

.active_job_enqueue?(klass) ⇒ Boolean

Returns true if the enqueuing needs to be done for an ActiveJob

class false otherwise.

Parameters:

  • klass (Class)

    the class to check is decendant from ActiveJob

Returns:

  • (Boolean)


255
256
257
# File 'lib/sidekiq/scheduler.rb', line 255

def self.active_job_enqueue?(klass)
  defined?(ActiveJob::Enqueuing) && klass.included_modules.include?(ActiveJob::Enqueuing)
end

.clear_schedule!Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler



192
193
194
195
196
197
# File 'lib/sidekiq/scheduler.rb', line 192

def self.clear_schedule!
  self.rufus_scheduler.stop
  @rufus_scheduler = nil
  @@scheduled_jobs = {}
  self.rufus_scheduler
end

.enabled_queue?(job_queue) ⇒ Boolean

Returns true if a job’s queue is being listened on by sidekiq

Parameters:

  • job_queue (String)

    Job’s queue name

Returns:

  • (Boolean)


284
285
286
287
288
# File 'lib/sidekiq/scheduler.rb', line 284

def self.enabled_queue?(job_queue)
  queues = Sidekiq.options[:queues]

  queues.empty? || queues.include?(job_queue)
end

.enque_with_active_job(config) ⇒ Object



233
234
235
# File 'lib/sidekiq/scheduler.rb', line 233

def self.enque_with_active_job(config)
  initialize_active_job(config['class'], config['args']).enqueue(config)
end

.enque_with_sidekiq(config) ⇒ Object



237
238
239
# File 'lib/sidekiq/scheduler.rb', line 237

def self.enque_with_sidekiq(config)
  Sidekiq::Client.push(config)
end

.enqueue_job(job_config) ⇒ Object

Enqueue a job based on a config hash



168
169
170
171
172
173
174
175
176
# File 'lib/sidekiq/scheduler.rb', line 168

def self.enqueue_job(job_config)
  config = prepare_arguments(job_config.dup)

  if active_job_enqueue?(config['class'])
    enque_with_active_job(config)
  else
    enque_with_sidekiq(config)
  end
end

.handle_errorsObject



159
160
161
162
163
164
165
# File 'lib/sidekiq/scheduler.rb', line 159

def self.handle_errors
  begin
    yield
  rescue StandardError => e
    logger.info "#{e.class.name}: #{e.message}"
  end
end

.idempotent_job_enqueue(job_name, time, config) ⇒ Object

Pushes the job into Sidekiq if not already pushed for the given time

Parameters:

  • job_name (String)

    The job’s name

  • time (Time)

    The time when the job got cleared for triggering

  • config (Hash)

    Job’s config hash



139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/sidekiq/scheduler.rb', line 139

def self.idempotent_job_enqueue(job_name, time, config)
  registered = register_job_instance(job_name, time)

  if registered
    logger.info "queueing #{config['class']} (#{job_name})"

    self.handle_errors { self.enqueue_job(config) }

    remove_elder_job_instances(job_name)
  else
    logger.debug { "Ignoring #{job_name} job as it has been already enqueued" }
  end
end

.initialize_active_job(klass, args) ⇒ Object



241
242
243
244
245
246
247
# File 'lib/sidekiq/scheduler.rb', line 241

def self.initialize_active_job(klass, args)
  if args.is_a?(Array)
    klass.new(*args)
  else
    klass.new(args)
  end
end

.load_schedule!Object

Pulls the schedule from Sidekiq.schedule and loads it into the rufus scheduler instance



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/sidekiq/scheduler.rb', line 48

def self.load_schedule!
  if enabled
    logger.info 'Loading Schedule'

    # Load schedule from redis for the first time if dynamic
    if dynamic
      Sidekiq.reload_schedule!
      self.rufus_scheduler.every('5s') do
        self.update_schedule
      end
    end

    logger.info 'Schedule empty! Set Sidekiq.schedule' if Sidekiq.schedule.empty?


    @@scheduled_jobs = {}

    Sidekiq.schedule.each do |name, config|
      if !listened_queues_only || enabled_queue?(config['queue'])
        self.load_schedule_job(name, config)
      else
        logger.info { "Ignoring #{name}, job's queue is not enabled." }
      end
    end

    Sidekiq.redis { |r| r.del(:schedules_changed) }

    logger.info 'Schedules Loaded'
  else
    logger.info 'SidekiqScheduler is disabled'
  end
end

.load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/sidekiq/scheduler.rb', line 97

def self.load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || self.rails_env_matches?(config)
    logger.info "Scheduling #{name} #{config}"
    interval_defined = false
    interval_types = %w{cron every at in interval}
    interval_types.each do |interval_type|
      config_interval_type = config[interval_type]

      if !config_interval_type.nil? && config_interval_type.length > 0

        args = self.optionizate_interval_value(config_interval_type)

        # We want rufus_scheduler to return a job object, not a job id
        opts = { :job => true }

        @@scheduled_jobs[name] = self.rufus_scheduler.send(interval_type, *args, opts) do |job, time|
          config.delete(interval_type)

          idempotent_job_enqueue(name, time, config)
        end

        interval_defined = true

        break
      end
    end

    unless interval_defined
      logger.info "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
    end
  end
end

.optionizate_interval_value(value) ⇒ Object

modify interval type value to value with options if options available



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/sidekiq/scheduler.rb', line 82

def self.optionizate_interval_value(value)
  args = value
  if args.is_a?(::Array)
    return args.first if args.size > 2 || !args.last.is_a?(::Hash)
    # symbolize keys of hash for options
    args[1] = args[1].inject({}) do |m, i|
      key, value = i
      m[(key.to_sym rescue key) || key] = value
      m
    end
  end
  args
end

.prepare_arguments(config) ⇒ Hash

Convert the given arguments in the format expected to be enqueued.

Parameters:

  • config (Hash)

    the options to be converted

Options Hash (config):

  • class (String)

    the job class

  • args (Hash/Array)

    the arguments to be passed to the job class

Returns:

  • (Hash)


267
268
269
270
271
272
273
274
275
276
277
# File 'lib/sidekiq/scheduler.rb', line 267

def self.prepare_arguments(config)
  config['class'] = config['class'].constantize if config['class'].is_a?(String)

  if config['args'].is_a?(Hash)
    config['args'].symbolize_keys! if config['args'].respond_to?(:symbolize_keys!)
  else
    config['args'] = Array(config['args'])
  end

  config
end


36
37
38
39
40
41
42
43
44
# File 'lib/sidekiq/scheduler.rb', line 36

def self.print_schedule
  if self.rufus_scheduler
    logger.info "Scheduling Info\tLast Run"
    scheduler_jobs = self.rufus_scheduler.all_jobs
    scheduler_jobs.each do |_, v|
      logger.info "#{v.t}\t#{v.last}\t"
    end
  end
end

.pushed_job_key(job_name) ⇒ String

Returns the key of the Redis sorted set used to store job enqueues

Parameters:

  • job_name (String)

    The name of the job

Returns:

  • (String)


320
321
322
# File 'lib/sidekiq/scheduler.rb', line 320

def self.pushed_job_key(job_name)
  "sidekiq-scheduler:pushed:#{job_name}"
end

.rails_env_matches?(config) ⇒ Boolean

Returns true if the given schedule config hash matches the current ENV

Returns:

  • (Boolean)


155
156
157
# File 'lib/sidekiq/scheduler.rb', line 155

def self.rails_env_matches?(config)
  config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/, '').split(',').include?(ENV['RAILS_ENV'])
end

.register_job_instance(job_name, time) ⇒ Boolean

Registers a queued job instance

Parameters:

  • job_name (String)

    The job’s name

  • time (Time)

    Time at which the job was cleared by the scheduler

Returns:

  • (Boolean)

    true if the job was registered, false when otherwise



296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/sidekiq/scheduler.rb', line 296

def self.register_job_instance(job_name, time)
  pushed_job_key = pushed_job_key(job_name)

  registered, _ = Sidekiq.redis do |r|
    r.pipelined do
      r.zadd(pushed_job_key, time.to_i, time.to_i)
      r.expire(pushed_job_key, REGISTERED_JOBS_THRESHOLD_IN_SECONDS)
    end
  end

  registered
end

.reload_schedule!Object



199
200
201
202
203
204
205
206
207
# File 'lib/sidekiq/scheduler.rb', line 199

def self.reload_schedule!
  if enabled
    logger.info 'Reloading Schedule'
    self.clear_schedule!
    self.load_schedule!
  else
    logger.info 'SidekiqScheduler is disabled'
  end
end

.remove_elder_job_instances(job_name) ⇒ Object



309
310
311
312
313
# File 'lib/sidekiq/scheduler.rb', line 309

def self.remove_elder_job_instances(job_name)
  Sidekiq.redis do |r|
    r.zremrangebyscore(pushed_job_key(job_name), 0, Time.now.to_i - REGISTERED_JOBS_THRESHOLD_IN_SECONDS)
  end
end

.rufus_schedulerObject



186
187
188
# File 'lib/sidekiq/scheduler.rb', line 186

def self.rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.new(rufus_scheduler_options)
end

.rufus_scheduler_optionsObject



178
179
180
# File 'lib/sidekiq/scheduler.rb', line 178

def self.rufus_scheduler_options
  @rufus_scheduler_options ||= {}
end

.rufus_scheduler_options=(options) ⇒ Object



182
183
184
# File 'lib/sidekiq/scheduler.rb', line 182

def self.rufus_scheduler_options=(options)
  @rufus_scheduler_options = options
end

.scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



32
33
34
# File 'lib/sidekiq/scheduler.rb', line 32

def self.scheduled_jobs
  @@scheduled_jobs
end

.unschedule_job(name) ⇒ Object



225
226
227
228
229
230
231
# File 'lib/sidekiq/scheduler.rb', line 225

def self.unschedule_job(name)
  if self.scheduled_jobs[name]
    logger.debug "Removing schedule #{name}"
    self.scheduled_jobs[name].unschedule
    self.scheduled_jobs.delete(name)
  end
end

.update_scheduleObject



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/sidekiq/scheduler.rb', line 209

def self.update_schedule
  if Sidekiq.redis { |r| r.scard(:schedules_changed) } > 0
    logger.info 'Updating schedule'
    Sidekiq.reload_schedule!
    while schedule_name = Sidekiq.redis { |r| r.spop(:schedules_changed) }
      if Sidekiq.schedule.keys.include?(schedule_name)
        self.unschedule_job(schedule_name)
        self.load_schedule_job(schedule_name, Sidekiq.schedule[schedule_name])
      else
        self.unschedule_job(schedule_name)
      end
    end
    logger.info 'Schedules Loaded'
  end
end