Class: Sidekiq::Cron::Job

Inherits:
Object
  • Object
show all
Extended by:
Util
Includes:
Util
Defined in:
lib/sidekiq/cron/job.rb

Constant Summary collapse

REMEMBER_THRESHOLD =

how long we would like to store informations about previous enqueues

24 * 60 * 60
LAST_ENQUEUE_TIME_FORMAT =
'%Y-%m-%d %H:%M:%S %z'
LAST_ENQUEUE_TIME_FORMAT_OLD =
'%Y-%m-%d %H:%M:%S'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input_args = {}) ⇒ Job

Returns a new instance of Job.



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/sidekiq/cron/job.rb', line 262

def initialize input_args = {}
  args = Hash[input_args.map{ |k, v| [k.to_s, v] }]
  @fetch_missing_args = args.delete('fetch_missing_args')
  @fetch_missing_args = true if @fetch_missing_args.nil?

  @name = args["name"]
  @cron = args["cron"]
  @description = args["description"] if args["description"]

  #get class from klass or class
  @klass = args["klass"] || args["class"]

  #set status of job
  @status = args['status'] || status_from_redis

  #set last enqueue time - from args or from existing job
  if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
    @last_enqueue_time = parse_enqueue_time(args['last_enqueue_time'])
  else
    @last_enqueue_time = last_enqueue_time_from_redis
  end

  #get right arguments for job
  @args = args["args"].nil? ? [] : parse_args( args["args"] )

  @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
  @active_job_queue_name_prefix = args["queue_name_prefix"]
  @active_job_queue_name_delimiter = args["queue_name_delimiter"]

  if args["message"]
    @message = args["message"]
    message_data = Sidekiq.load_json(@message) || {}
    @queue = message_data['queue'] || "default"
  elsif @klass
    message_data = {
      "class" => @klass.to_s,
      "args"  => @args,
    }

    #get right data for message
    #only if message wasn't specified before
    klass_data = case @klass
      when Class
        @klass.get_sidekiq_options
      when String
        begin
          Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options
        rescue Exception => e
          #Unknown class
          {"queue"=>"default"}
        end
    end

    message_data = klass_data.merge(message_data)
    #override queue if setted in config
    #only if message is hash - can be string (dumped JSON)
    if args['queue']
      @queue = message_data['queue'] = args['queue']
    else
      @queue = message_data['queue'] || "default"
    end

    #dump message as json
    @message = message_data
  end

  @queue_name_with_prefix = queue_name_with_prefix
end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def args
  @args
end

#cronObject

Returns the value of attribute cron.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def cron
  @cron
end

#descriptionObject

Returns the value of attribute description.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def description
  @description
end

#fetch_missing_argsObject (readonly)

Returns the value of attribute fetch_missing_args.



260
261
262
# File 'lib/sidekiq/cron/job.rb', line 260

def fetch_missing_args
  @fetch_missing_args
end

#klassObject

Returns the value of attribute klass.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def klass
  @klass
end

#last_enqueue_timeObject (readonly)

Returns the value of attribute last_enqueue_time.



260
261
262
# File 'lib/sidekiq/cron/job.rb', line 260

def last_enqueue_time
  @last_enqueue_time
end

#messageObject

Returns the value of attribute message.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def message
  @message
end

#nameObject

Returns the value of attribute name.



259
260
261
# File 'lib/sidekiq/cron/job.rb', line 259

def name
  @name
end

Class Method Details

.allObject

get all cron jobs



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/sidekiq/cron/job.rb', line 205

def self.all
  job_hashes = nil
  Sidekiq.redis do |conn|
    set_members = conn.smembers(jobs_key)
    job_hashes = conn.pipelined do
      set_members.each do |key|
        conn.hgetall(key)
      end
    end
  end
  job_hashes.compact.reject(&:empty?).collect do |h|
    # no need to fetch missing args from redis since we just got this hash from there
    Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
  end
end

.countObject



221
222
223
224
225
226
227
# File 'lib/sidekiq/cron/job.rb', line 221

def self.count
  out = 0
  Sidekiq.redis do |conn|
    out = conn.scard(jobs_key)
  end
  out
end

.create(hash) ⇒ Object

create new instance of cron job



243
244
245
# File 'lib/sidekiq/cron/job.rb', line 243

def self.create hash
  new(hash).save
end

.destroy(name) ⇒ Object

destroy job by name



248
249
250
251
252
253
254
255
256
257
# File 'lib/sidekiq/cron/job.rb', line 248

def self.destroy name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  if job = find(name)
    job.destroy
  else
    false
  end
end

.destroy_all!Object

remove all job from cron



478
479
480
481
482
483
# File 'lib/sidekiq/cron/job.rb', line 478

def self.destroy_all!
  all.each do |job|
    job.destroy
  end
  logger.info { "Cron Jobs - deleted all jobs" }
end

.destroy_removed_jobs(new_job_names) ⇒ Object

remove “removed jobs” between current jobs and new jobs



486
487
488
489
490
491
# File 'lib/sidekiq/cron/job.rb', line 486

def self.destroy_removed_jobs new_job_names
  current_job_names = Sidekiq::Cron::Job.all.map(&:name)
  removed_job_names = current_job_names - new_job_names
  removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
  removed_job_names
end

.exists?(name) ⇒ Boolean

Returns:

  • (Boolean)


507
508
509
510
511
512
513
# File 'lib/sidekiq/cron/job.rb', line 507

def self.exists? name
  out = false
  Sidekiq.redis do |conn|
    out = conn.exists redis_key name
  end
  out
end

.find(name) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/sidekiq/cron/job.rb', line 229

def self.find name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  output = nil
  Sidekiq.redis do |conn|
    if exists? name
      output = Job.new conn.hgetall( redis_key(name) )
    end
  end
  output
end

.load_from_array(array) ⇒ Object

load cron jobs from Array input structure should look like: [

{
  'name'        => 'name_of_job',
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
{
  'name'  => 'Cool Job for Second Class',
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

]



187
188
189
190
191
192
193
194
# File 'lib/sidekiq/cron/job.rb', line 187

def self.load_from_array array
  errors = {}
  array.each do |job_data|
    job = new(job_data)
    errors[job.name] = job.errors unless job.save
  end
  errors
end

.load_from_array!(array) ⇒ Object

like to #load_from_array If exists old jobs in redis but removed from args, destroy old jobs



198
199
200
201
202
# File 'lib/sidekiq/cron/job.rb', line 198

def self.load_from_array! array
  job_names = array.map { |job| job["name"] }
  destroy_removed_jobs(job_names)
  load_from_array(array)
end

.load_from_hash(hash) ⇒ Object

load cron jobs from Hash input structure should look like: {

'name_of_job' => {
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
'My super iber cool job' => {
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

}



155
156
157
158
159
160
161
# File 'lib/sidekiq/cron/job.rb', line 155

def self.load_from_hash hash
  array = hash.inject([]) do |out,(key, job)|
    job['name'] = key
    out << job
  end
  load_from_array array
end

.load_from_hash!(hash) ⇒ Object

like to #load_from_hash If exists old jobs in redis but removed from args, destroy old jobs



165
166
167
168
# File 'lib/sidekiq/cron/job.rb', line 165

def self.load_from_hash! hash
  destroy_removed_jobs(hash.keys)
  load_from_hash(hash)
end

Instance Method Details

#active_job_messageObject

active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job



125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/sidekiq/cron/job.rb', line 125

def active_job_message
  {
    'class'        => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
    'wrapped'      => @klass,
    'queue'        => @queue_name_with_prefix,
    'description'  => @description,
    'args'         => [{
      'job_class'  => @klass,
      'job_id'     => SecureRandom.uuid,
      'queue_name' => @queue_name_with_prefix,
      'arguments'  => @args
    }]
  }
end

#destroyObject

remove job from cron jobs by name input:

first arg: name (string) - name of job (must be same - case sensitive)


463
464
465
466
467
468
469
470
471
472
473
474
475
# File 'lib/sidekiq/cron/job.rb', line 463

def destroy
  Sidekiq.redis do |conn|
    #delete from set
    conn.srem self.class.jobs_key, redis_key

    #delete runned timestamps
    conn.del job_enqueued_key

    #delete main job
    conn.del redis_key
  end
  logger.info { "Cron Jobs - deleted job with name: #{@name}" }
end

#disable!Object



335
336
337
338
# File 'lib/sidekiq/cron/job.rb', line 335

def disable!
  @status = "disabled"
  save
end

#disabled?Boolean

Returns:

  • (Boolean)


349
350
351
# File 'lib/sidekiq/cron/job.rb', line 349

def disabled?
  !enabled?
end

#enable!Object



340
341
342
343
# File 'lib/sidekiq/cron/job.rb', line 340

def enable!
  @status = "enabled"
  save
end

#enabled?Boolean

Returns:

  • (Boolean)


345
346
347
# File 'lib/sidekiq/cron/job.rb', line 345

def enabled?
  @status == "enabled"
end

#enque!(time = Time.now.utc) ⇒ Object

enque cron job to queue



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/sidekiq/cron/job.rb', line 50

def enque! time = Time.now.utc
  @last_enqueue_time = time.strftime(LAST_ENQUEUE_TIME_FORMAT)

  klass_const =
      begin
        Sidekiq::Cron::Support.constantize(@klass.to_s)
      rescue NameError
        nil
      end

  if klass_const
    if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base
      enqueue_active_job(klass_const)
    else
      enqueue_sidekiq_worker(klass_const)
    end
  else
    if @active_job
      Sidekiq::Client.push(active_job_message)
    else
      Sidekiq::Client.push(sidekiq_worker_message)
    end
  end

  save_last_enqueue_time
  logger.debug { "enqueued #{@name}: #{@message}" }
end

#enqueue_active_job(klass_const) ⇒ Object



84
85
86
87
88
# File 'lib/sidekiq/cron/job.rb', line 84

def enqueue_active_job(klass_const)
  klass_const.set(queue: @queue).perform_later(*@args)

  true
end

#enqueue_sidekiq_worker(klass_const) ⇒ Object



90
91
92
93
94
# File 'lib/sidekiq/cron/job.rb', line 90

def enqueue_sidekiq_worker(klass_const)
  klass_const.set(queue: queue_name_with_prefix).perform_async(*@args)

  true
end

#errorsObject



391
392
393
# File 'lib/sidekiq/cron/job.rb', line 391

def errors
  @errors ||= []
end

#exists?Boolean

Returns:

  • (Boolean)


515
516
517
# File 'lib/sidekiq/cron/job.rb', line 515

def exists?
  self.class.exists? @name
end

#formated_enqueue_time(now = Time.now.utc) ⇒ Object



499
500
501
# File 'lib/sidekiq/cron/job.rb', line 499

def formated_enqueue_time now = Time.now.utc
  last_time(now).getutc.to_f.to_s
end

#formated_last_time(now = Time.now.utc) ⇒ Object



503
504
505
# File 'lib/sidekiq/cron/job.rb', line 503

def formated_last_time now = Time.now.utc
  last_time(now).getutc.iso8601
end

#is_active_job?Boolean

Returns:

  • (Boolean)


78
79
80
81
82
# File 'lib/sidekiq/cron/job.rb', line 78

def is_active_job?
  @active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base
rescue NameError
  false
end

#klass_validObject



415
416
417
418
419
420
421
422
423
# File 'lib/sidekiq/cron/job.rb', line 415

def klass_valid
  case @klass
    when Class
      true
    when String
      @klass.size > 0
    else
  end
end

#last_enqueue_time_from_redisObject



364
365
366
367
368
369
370
371
372
# File 'lib/sidekiq/cron/job.rb', line 364

def last_enqueue_time_from_redis
  out = nil
  if fetch_missing_args
    Sidekiq.redis do |conn|
      out = parse_enqueue_time(conn.hget(redis_key, "last_enqueue_time")) rescue nil
    end
  end
  out
end

#last_time(now = Time.now.utc) ⇒ Object

Parse cron specification ‘* * * * *’ and returns time when last run should be performed



495
496
497
# File 'lib/sidekiq/cron/job.rb', line 495

def last_time now = Time.now.utc
  parsed_cron.previous_time(now.utc).utc
end

#queue_name_with_prefixObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/sidekiq/cron/job.rb', line 101

def queue_name_with_prefix
  return @queue unless is_active_job?

  if !"#{@active_job_queue_name_delimiter}".empty?
    queue_name_delimiter = @active_job_queue_name_delimiter
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty?
    queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
  else
    queue_name_delimiter = '_'
  end

  if !"#{@active_job_queue_name_prefix}".empty?
    queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
    queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  else
    queue_name = @queue
  end

  queue_name
end

#remove_previous_enques(time) ⇒ Object

remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory



33
34
35
36
37
# File 'lib/sidekiq/cron/job.rb', line 33

def remove_previous_enques time
  Sidekiq.redis do |conn|
    conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
  end
end

#saveObject

add job to cron jobs input:

name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform

optional input:

queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method


434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# File 'lib/sidekiq/cron/job.rb', line 434

def save
  #if job is invalid return false
  return false unless valid?

  Sidekiq.redis do |conn|

    #add to set of all jobs
    conn.sadd self.class.jobs_key, redis_key

    #add informations for this job!
    conn.hmset redis_key, *hash_to_redis(to_hash)

    #add information about last time! - don't enque right after scheduler poller starts!
    time = Time.now.utc
    conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key)
  end
  logger.info { "Cron Jobs - add job with name: #{@name}" }
end

#save_last_enqueue_timeObject



453
454
455
456
457
458
# File 'lib/sidekiq/cron/job.rb', line 453

def save_last_enqueue_time
  Sidekiq.redis do |conn|
    # update last enqueue time
    conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time
  end
end

#should_enque?(time) ⇒ Boolean

crucial part of whole enquing job

Returns:

  • (Boolean)


19
20
21
22
23
24
25
26
27
28
# File 'lib/sidekiq/cron/job.rb', line 19

def should_enque? time
  enqueue = false
  enqueue = Sidekiq.redis do |conn|
    status == "enabled" &&
      not_past_scheduled_time?(time) &&
      not_enqueued_after?(time) &&
      conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
  end
  enqueue
end

#sidekiq_worker_messageObject

siodekiq worker message



97
98
99
# File 'lib/sidekiq/cron/job.rb', line 97

def sidekiq_worker_message
  @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
end

#sort_nameObject



519
520
521
# File 'lib/sidekiq/cron/job.rb', line 519

def sort_name
  "#{status == "enabled" ? 0 : 1}_#{name}".downcase
end

#statusObject



331
332
333
# File 'lib/sidekiq/cron/job.rb', line 331

def status
  @status
end

#status_from_redisObject



353
354
355
356
357
358
359
360
361
362
# File 'lib/sidekiq/cron/job.rb', line 353

def status_from_redis
  out = "enabled"
  if fetch_missing_args
    Sidekiq.redis do |conn|
      status = conn.hget redis_key, "status"
      out = status if status
    end
  end
  out
end

#test_and_enque_for_time!(time) ⇒ Object

test if job should be enqued If yes add it to queue



40
41
42
43
44
45
46
47
# File 'lib/sidekiq/cron/job.rb', line 40

def test_and_enque_for_time! time
  #should this job be enqued?
  if should_enque?(time)
    enque!

    remove_previous_enques(time)
  end
end

#to_hashObject

export job data to hash



375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/sidekiq/cron/job.rb', line 375

def to_hash
  {
    name: @name,
    klass: @klass,
    cron: @cron,
    description: @description,
    args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
    message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
    status: @status,
    active_job: @active_job,
    queue_name_prefix: @active_job_queue_name_prefix,
    queue_name_delimiter: @active_job_queue_name_delimiter,
    last_enqueue_time: @last_enqueue_time,
  }
end

#valid?Boolean

Returns:

  • (Boolean)


395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/sidekiq/cron/job.rb', line 395

def valid?
  #clear previous errors
  @errors = []

  errors << "'name' must be set" if @name.nil? || @name.size == 0
  if @cron.nil? || @cron.size == 0
    errors << "'cron' must be set"
  else
    begin
      @parsed_cron = Fugit.do_parse_cron(@cron)
    rescue => e
      errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
    end
  end

  errors << "'klass' (or class) must be set" unless klass_valid

  errors.empty?
end