Class: Sidekiq::Cron::Job

Inherits:
Object
  • Object
show all
Extended by:
Util
Includes:
Util
Defined in:
lib/sidekiq/cron/job.rb

Constant Summary collapse

REMEMBER_THRESHOLD =

how long we would like to store informations about previous enqueues

24 * 60 * 60
LAST_ENQUEUE_TIME_FORMAT =
'%Y-%m-%d %H:%M:%S %z'
REDIS_EXISTS_METHOD =

Use the exists? method if we’re on a newer version of redis.

Gem.loaded_specs['redis'].version < Gem::Version.new('4.2') ? :exists : :exists?

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input_args = {}) ⇒ Job



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/sidekiq/cron/job.rb', line 262

def initialize input_args = {}
  args = Hash[input_args.map{ |k, v| [k.to_s, v] }]
  @fetch_missing_args = args.delete('fetch_missing_args')
  @fetch_missing_args = true if @fetch_missing_args.nil?

  @name = args["name"]
  @cron = args["cron"]
  @description = args["description"] if args["description"]

  #get class from klass or class
  @klass = args["klass"] || args["class"]

  #set status of job
  @status = args['status'] || status_from_redis

  #set last enqueue time - from args or from existing job
  if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
    @last_enqueue_time = parse_enqueue_time(args['last_enqueue_time'])
  else
    @last_enqueue_time = last_enqueue_time_from_redis
  end

  #get right arguments for job
  @symbolize_args = args["symbolize_args"] == true || ("#{args["symbolize_args"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
  @args = args["args"].nil? ? [] : parse_args( args["args"] )
  @args += [Time.now.to_f] if args["date_as_argument"]

  @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
  @active_job_queue_name_prefix = args["queue_name_prefix"]
  @active_job_queue_name_delimiter = args["queue_name_delimiter"]

  if args["message"]
    @message = args["message"]
    message_data = Sidekiq.load_json(@message) || {}
    @queue = message_data['queue'] || "default"
  elsif @klass
    message_data = {
      "class" => @klass.to_s,
      "args"  => @args,
    }

    #get right data for message
    #only if message wasn't specified before
    klass_data = case @klass
      when Class
        @klass.get_sidekiq_options
      when String
        begin
          Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options
        rescue Exception => e
          #Unknown class
          {"queue"=>"default"}
        end
    end

    message_data = klass_data.merge(message_data)
    #override queue if setted in config
    #only if message is hash - can be string (dumped JSON)
    if args['queue']
      @queue = message_data['queue'] = args['queue']
    else
      @queue = message_data['queue'] || "default"
    end

    #dump message as json
    @message = message_data
  end

  @queue_name_with_prefix = queue_name_with_prefix
end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def args
  @args
end

#cronObject

Returns the value of attribute cron.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def cron
  @cron
end

#descriptionObject

Returns the value of attribute description.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def description
  @description
end

#fetch_missing_argsObject (readonly)

Returns the value of attribute fetch_missing_args.



260
261
262
# File 'lib/sidekiq/cron/job.rb', line 260

def fetch_missing_args
  @fetch_missing_args
end

#klassObject

Returns the value of attribute klass.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def klass
  @klass
end

#last_enqueue_timeObject (readonly)

Returns the value of attribute last_enqueue_time.



260
261
262
# File 'lib/sidekiq/cron/job.rb', line 260

def last_enqueue_time
  @last_enqueue_time
end

#messageObject

Returns the value of attribute message.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def message
  @message
end

#nameObject

Returns the value of attribute name.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def name
  @name
end

Class Method Details

.allObject

get all cron jobs



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/sidekiq/cron/job.rb', line 205

def self.all
  job_hashes = nil
  Sidekiq.redis do |conn|
    set_members = conn.smembers(jobs_key)
    job_hashes = conn.pipelined do |pipeline|
      set_members.each do |key|
        pipeline.hgetall(key)
      end
    end
  end
  job_hashes.compact.reject(&:empty?).collect do |h|
    # no need to fetch missing args from redis since we just got this hash from there
    Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
  end
end

.countObject



221
222
223
224
225
226
227
# File 'lib/sidekiq/cron/job.rb', line 221

def self.count
  out = 0
  Sidekiq.redis do |conn|
    out = conn.scard(jobs_key)
  end
  out
end

.create(hash) ⇒ Object

create new instance of cron job



243
244
245
# File 'lib/sidekiq/cron/job.rb', line 243

def self.create hash
  new(hash).save
end

.destroy(name) ⇒ Object

destroy job by name



248
249
250
251
252
253
254
255
256
257
# File 'lib/sidekiq/cron/job.rb', line 248

def self.destroy name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  if job = find(name)
    job.destroy
  else
    false
  end
end

.destroy_all!Object

remove all job from cron



516
517
518
519
520
521
# File 'lib/sidekiq/cron/job.rb', line 516

def self.destroy_all!
  all.each do |job|
    job.destroy
  end
  logger.info { "Cron Jobs - deleted all jobs" }
end

.destroy_removed_jobs(new_job_names) ⇒ Object

remove “removed jobs” between current jobs and new jobs



524
525
526
527
528
529
# File 'lib/sidekiq/cron/job.rb', line 524

def self.destroy_removed_jobs new_job_names
  current_job_names = Sidekiq::Cron::Job.all.map(&:name)
  removed_job_names = current_job_names - new_job_names
  removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
  removed_job_names
end

.exists?(name) ⇒ Boolean



545
546
547
548
549
550
551
# File 'lib/sidekiq/cron/job.rb', line 545

def self.exists? name
  out = false
  Sidekiq.redis do |conn|
    out = conn.public_send(REDIS_EXISTS_METHOD, redis_key(name))
  end
  out
end

.find(name) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/sidekiq/cron/job.rb', line 229

def self.find name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  output = nil
  Sidekiq.redis do |conn|
    if exists? name
      output = Job.new conn.hgetall( redis_key(name) )
    end
  end
  output if output && output.valid?
end

.load_from_array(array) ⇒ Object

load cron jobs from Array input structure should look like: [

{
  'name'        => 'name_of_job',
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
{
  'name'  => 'Cool Job for Second Class',
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

]



187
188
189
190
191
192
193
194
# File 'lib/sidekiq/cron/job.rb', line 187

def self.load_from_array array
  errors = {}
  array.each do |job_data|
    job = new(job_data)
    errors[job.name] = job.errors unless job.save
  end
  errors
end

.load_from_array!(array) ⇒ Object

like to #load_from_array If exists old jobs in redis but removed from args, destroy old jobs



198
199
200
201
202
# File 'lib/sidekiq/cron/job.rb', line 198

def self.load_from_array! array
  job_names = array.map { |job| job["name"] }
  destroy_removed_jobs(job_names)
  load_from_array(array)
end

.load_from_hash(hash) ⇒ Object

load cron jobs from Hash input structure should look like: {

'name_of_job' => {
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
'My super iber cool job' => {
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

}



155
156
157
158
159
160
161
# File 'lib/sidekiq/cron/job.rb', line 155

def self.load_from_hash hash
  array = hash.inject([]) do |out,(key, job)|
    job['name'] = key
    out << job
  end
  load_from_array array
end

.load_from_hash!(hash) ⇒ Object

like to #load_from_hash If exists old jobs in redis but removed from args, destroy old jobs



165
166
167
168
# File 'lib/sidekiq/cron/job.rb', line 165

def self.load_from_hash! hash
  destroy_removed_jobs(hash.keys)
  load_from_hash(hash)
end

Instance Method Details

#active_job_messageObject

active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job



125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/sidekiq/cron/job.rb', line 125

def active_job_message
  {
    'class'        => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
    'wrapped'      => @klass,
    'queue'        => @queue_name_with_prefix,
    'description'  => @description,
    'args'         => [{
      'job_class'  => @klass,
      'job_id'     => SecureRandom.uuid,
      'queue_name' => @queue_name_with_prefix,
      'arguments'  => @args
    }]
  }
end

#add_jid_history(jid) ⇒ Object



481
482
483
484
485
486
487
488
489
490
491
492
493
# File 'lib/sidekiq/cron/job.rb', line 481

def add_jid_history(jid)
  jid_history = {
    jid: jid,
    enqueued: @last_enqueue_time
  }
  @history_size ||= (Sidekiq.options[:cron_history_size] || 10).to_i - 1
  Sidekiq.redis do |conn|
    conn.lpush jid_history_key,
               Sidekiq.dump_json(jid_history)
    # keep only last 10 entries in a fifo manner
    conn.ltrim jid_history_key, 0, @history_size
  end
end

#destroyObject

remove job from cron jobs by name input:

first arg: name (string) - name of job (must be same - case sensitive)


498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
# File 'lib/sidekiq/cron/job.rb', line 498

def destroy
  Sidekiq.redis do |conn|
    #delete from set
    conn.srem self.class.jobs_key, redis_key

    #delete runned timestamps
    conn.del job_enqueued_key

    # delete jid_history
    conn.del jid_history_key

    #delete main job
    conn.del redis_key
  end
  logger.info { "Cron Jobs - deleted job with name: #{@name}" }
end

#disable!Object



337
338
339
340
# File 'lib/sidekiq/cron/job.rb', line 337

def disable!
  @status = "disabled"
  save
end

#disabled?Boolean



351
352
353
# File 'lib/sidekiq/cron/job.rb', line 351

def disabled?
  !enabled?
end

#enable!Object



342
343
344
345
# File 'lib/sidekiq/cron/job.rb', line 342

def enable!
  @status = "enabled"
  save
end

#enabled?Boolean



347
348
349
# File 'lib/sidekiq/cron/job.rb', line 347

def enabled?
  @status == "enabled"
end

#enque!(time = Time.now.utc) ⇒ Object

enque cron job to queue



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sidekiq/cron/job.rb', line 52

def enque! time = Time.now.utc
  @last_enqueue_time = time.strftime(LAST_ENQUEUE_TIME_FORMAT)

  klass_const =
      begin
        Sidekiq::Cron::Support.constantize(@klass.to_s)
      rescue NameError
        nil
      end

  jid =
    if klass_const
      if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base
        enqueue_active_job(klass_const).try :provider_job_id
      else
        enqueue_sidekiq_worker(klass_const)
      end
    else
      if @active_job
        Sidekiq::Client.push(active_job_message)
      else
        Sidekiq::Client.push(sidekiq_worker_message)
      end
    end

  save_last_enqueue_time
  add_jid_history jid
  logger.debug { "enqueued #{@name}: #{@message}" }
end

#enqueue_active_job(klass_const) ⇒ Object



88
89
90
# File 'lib/sidekiq/cron/job.rb', line 88

def enqueue_active_job(klass_const)
  klass_const.set(queue: @queue).perform_later(*@args)
end

#enqueue_sidekiq_worker(klass_const) ⇒ Object



92
93
94
# File 'lib/sidekiq/cron/job.rb', line 92

def enqueue_sidekiq_worker(klass_const)
  klass_const.set(queue: queue_name_with_prefix).perform_async(*@args)
end

#errorsObject



412
413
414
# File 'lib/sidekiq/cron/job.rb', line 412

def errors
  @errors ||= []
end

#exists?Boolean



553
554
555
# File 'lib/sidekiq/cron/job.rb', line 553

def exists?
  self.class.exists? @name
end

#formated_enqueue_time(now = Time.now.utc) ⇒ Object



537
538
539
# File 'lib/sidekiq/cron/job.rb', line 537

def formated_enqueue_time now = Time.now.utc
  last_time(now).getutc.to_f.to_s
end

#formated_last_time(now = Time.now.utc) ⇒ Object



541
542
543
# File 'lib/sidekiq/cron/job.rb', line 541

def formated_last_time now = Time.now.utc
  last_time(now).getutc.iso8601
end

#is_active_job?Boolean



82
83
84
85
86
# File 'lib/sidekiq/cron/job.rb', line 82

def is_active_job?
  @active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base
rescue NameError
  false
end

#jid_history_from_redisObject



382
383
384
385
386
387
388
389
390
391
392
# File 'lib/sidekiq/cron/job.rb', line 382

def jid_history_from_redis
  out =
    Sidekiq.redis do |conn|
      conn.lrange(jid_history_key, 0, -1) rescue nil
    end

  # returns nil if out nil
  out && out.map do |jid_history_raw|
    Sidekiq.load_json jid_history_raw
  end
end

#klass_validObject



436
437
438
439
440
441
442
443
444
# File 'lib/sidekiq/cron/job.rb', line 436

def klass_valid
  case @klass
    when Class
      true
    when String
      @klass.size > 0
    else
  end
end

#last_enqueue_time_from_redisObject



372
373
374
375
376
377
378
379
380
# File 'lib/sidekiq/cron/job.rb', line 372

def last_enqueue_time_from_redis
  out = nil
  if fetch_missing_args
    Sidekiq.redis do |conn|
      out = parse_enqueue_time(conn.hget(redis_key, "last_enqueue_time")) rescue nil
    end
  end
  out
end

#last_time(now = Time.now.utc) ⇒ Object

Parse cron specification ‘* * * * *’ and returns time when last run should be performed



533
534
535
# File 'lib/sidekiq/cron/job.rb', line 533

def last_time now = Time.now.utc
  parsed_cron.previous_time(now.utc).utc
end

#pretty_messageObject



355
356
357
358
359
# File 'lib/sidekiq/cron/job.rb', line 355

def pretty_message
  JSON.pretty_generate Sidekiq.load_json(message)
rescue JSON::ParserError
  message
end

#queue_name_with_prefixObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/sidekiq/cron/job.rb', line 101

def queue_name_with_prefix
  return @queue unless is_active_job?

  if !"#{@active_job_queue_name_delimiter}".empty?
    queue_name_delimiter = @active_job_queue_name_delimiter
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty?
    queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
  else
    queue_name_delimiter = '_'
  end

  if !"#{@active_job_queue_name_prefix}".empty?
    queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
    queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  else
    queue_name = @queue
  end

  queue_name
end

#remove_previous_enques(time) ⇒ Object

remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory



35
36
37
38
39
# File 'lib/sidekiq/cron/job.rb', line 35

def remove_previous_enques time
  Sidekiq.redis do |conn|
    conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
  end
end

#saveObject

add job to cron jobs input:

name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform

optional input:

queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method


455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/sidekiq/cron/job.rb', line 455

def save
  #if job is invalid return false
  return false unless valid?

  Sidekiq.redis do |conn|

    #add to set of all jobs
    conn.sadd self.class.jobs_key, redis_key

    #add informations for this job!
    conn.hmset redis_key, *hash_to_redis(to_hash)

    #add information about last time! - don't enque right after scheduler poller starts!
    time = Time.now.utc
    conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.public_send(REDIS_EXISTS_METHOD, job_enqueued_key)
  end
  logger.info { "Cron Jobs - add job with name: #{@name}" }
end

#save_last_enqueue_timeObject



474
475
476
477
478
479
# File 'lib/sidekiq/cron/job.rb', line 474

def save_last_enqueue_time
  Sidekiq.redis do |conn|
    # update last enqueue time
    conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time
  end
end

#should_enque?(time) ⇒ Boolean

crucial part of whole enquing job



21
22
23
24
25
26
27
28
29
30
# File 'lib/sidekiq/cron/job.rb', line 21

def should_enque? time
  enqueue = false
  enqueue = Sidekiq.redis do |conn|
    status == "enabled" &&
      not_past_scheduled_time?(time) &&
      not_enqueued_after?(time) &&
      conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
  end
  enqueue
end

#sidekiq_worker_messageObject

siodekiq worker message



97
98
99
# File 'lib/sidekiq/cron/job.rb', line 97

def sidekiq_worker_message
  @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
end

#sort_nameObject



557
558
559
# File 'lib/sidekiq/cron/job.rb', line 557

def sort_name
  "#{status == "enabled" ? 0 : 1}_#{name}".downcase
end

#statusObject



333
334
335
# File 'lib/sidekiq/cron/job.rb', line 333

def status
  @status
end

#status_from_redisObject



361
362
363
364
365
366
367
368
369
370
# File 'lib/sidekiq/cron/job.rb', line 361

def status_from_redis
  out = "enabled"
  if fetch_missing_args
    Sidekiq.redis do |conn|
      status = conn.hget redis_key, "status"
      out = status if status
    end
  end
  out
end

#test_and_enque_for_time!(time) ⇒ Object

test if job should be enqued If yes add it to queue



42
43
44
45
46
47
48
49
# File 'lib/sidekiq/cron/job.rb', line 42

def test_and_enque_for_time! time
  #should this job be enqued?
  if should_enque?(time)
    enque!

    remove_previous_enques(time)
  end
end

#to_hashObject

export job data to hash



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/sidekiq/cron/job.rb', line 395

def to_hash
  {
    name: @name,
    klass: @klass,
    cron: @cron,
    description: @description,
    args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
    message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
    status: @status,
    active_job: @active_job,
    queue_name_prefix: @active_job_queue_name_prefix,
    queue_name_delimiter: @active_job_queue_name_delimiter,
    last_enqueue_time: @last_enqueue_time,
    symbolize_args: @symbolize_args,
  }
end

#valid?Boolean



416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/sidekiq/cron/job.rb', line 416

def valid?
  #clear previous errors
  @errors = []

  errors << "'name' must be set" if @name.nil? || @name.size == 0
  if @cron.nil? || @cron.size == 0
    errors << "'cron' must be set"
  else
    begin
      @parsed_cron = Fugit.do_parse_cron(@cron)
    rescue => e
      errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
    end
  end

  errors << "'klass' (or class) must be set" unless klass_valid

  errors.empty?
end