Class: Sidekiq::Cron::Job

Inherits:
Object
  • Object
show all
Extended by:
Util
Includes:
Util
Defined in:
lib/sidekiq/cron/job.rb

Constant Summary collapse

REMEMBER_THRESHOLD =

how long we would like to store informations about previous enqueues

24 * 60 * 60
LAST_ENQUEUE_TIME_FORMAT =
'%Y-%m-%d %H:%M:%S %z'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input_args = {}) ⇒ Job

Returns a new instance of Job.



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/sidekiq/cron/job.rb', line 259

def initialize input_args = {}
  args = Hash[input_args.map{ |k, v| [k.to_s, v] }]
  @fetch_missing_args = args.delete('fetch_missing_args')
  @fetch_missing_args = true if @fetch_missing_args.nil?

  @name = args["name"]
  @cron = args["cron"]
  @description = args["description"] if args["description"]

  #get class from klass or class
  @klass = args["klass"] || args["class"]

  #set status of job
  @status = args['status'] || status_from_redis

  #set last enqueue time - from args or from existing job
  if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
    @last_enqueue_time = parse_enqueue_time(args['last_enqueue_time'])
  else
    @last_enqueue_time = last_enqueue_time_from_redis
  end

  #get right arguments for job
  @args = args["args"].nil? ? [] : parse_args( args["args"] )
  @args += [Time.now.to_f] if args["date_as_argument"]

  @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
  @active_job_queue_name_prefix = args["queue_name_prefix"]
  @active_job_queue_name_delimiter = args["queue_name_delimiter"]

  if args["message"]
    @message = args["message"]
    message_data = Sidekiq.load_json(@message) || {}
    @queue = message_data['queue'] || "default"
  elsif @klass
    message_data = {
      "class" => @klass.to_s,
      "args"  => @args,
    }

    #get right data for message
    #only if message wasn't specified before
    klass_data = case @klass
      when Class
        @klass.get_sidekiq_options
      when String
        begin
          Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options
        rescue Exception => e
          #Unknown class
          {"queue"=>"default"}
        end
    end

    message_data = klass_data.merge(message_data)
    #override queue if setted in config
    #only if message is hash - can be string (dumped JSON)
    if args['queue']
      @queue = message_data['queue'] = args['queue']
    else
      @queue = message_data['queue'] || "default"
    end

    #dump message as json
    @message = message_data
  end

  @queue_name_with_prefix = queue_name_with_prefix
end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



256
257
258
# File 'lib/sidekiq/cron/job.rb', line 256

def args
  @args
end

#cronObject

Returns the value of attribute cron.



256
257
258
# File 'lib/sidekiq/cron/job.rb', line 256

def cron
  @cron
end

#descriptionObject

Returns the value of attribute description.



256
257
258
# File 'lib/sidekiq/cron/job.rb', line 256

def description
  @description
end

#fetch_missing_argsObject (readonly)

Returns the value of attribute fetch_missing_args.



257
258
259
# File 'lib/sidekiq/cron/job.rb', line 257

def fetch_missing_args
  @fetch_missing_args
end

#klassObject

Returns the value of attribute klass.



256
257
258
# File 'lib/sidekiq/cron/job.rb', line 256

def klass
  @klass
end

#last_enqueue_timeObject (readonly)

Returns the value of attribute last_enqueue_time.



257
258
259
# File 'lib/sidekiq/cron/job.rb', line 257

def last_enqueue_time
  @last_enqueue_time
end

#messageObject

Returns the value of attribute message.



256
257
258
# File 'lib/sidekiq/cron/job.rb', line 256

def message
  @message
end

#nameObject

Returns the value of attribute name.



256
257
258
# File 'lib/sidekiq/cron/job.rb', line 256

def name
  @name
end

Class Method Details

.allObject

get all cron jobs



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/sidekiq/cron/job.rb', line 202

def self.all
  job_hashes = nil
  Sidekiq.redis do |conn|
    set_members = conn.smembers(jobs_key)
    job_hashes = conn.pipelined do
      set_members.each do |key|
        conn.hgetall(key)
      end
    end
  end
  job_hashes.compact.reject(&:empty?).collect do |h|
    # no need to fetch missing args from redis since we just got this hash from there
    Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
  end
end

.countObject



218
219
220
221
222
223
224
# File 'lib/sidekiq/cron/job.rb', line 218

def self.count
  out = 0
  Sidekiq.redis do |conn|
    out = conn.scard(jobs_key)
  end
  out
end

.create(hash) ⇒ Object

create new instance of cron job



240
241
242
# File 'lib/sidekiq/cron/job.rb', line 240

def self.create hash
  new(hash).save
end

.destroy(name) ⇒ Object

destroy job by name



245
246
247
248
249
250
251
252
253
254
# File 'lib/sidekiq/cron/job.rb', line 245

def self.destroy name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  if job = find(name)
    job.destroy
  else
    false
  end
end

.destroy_all!Object

remove all job from cron



511
512
513
514
515
516
# File 'lib/sidekiq/cron/job.rb', line 511

def self.destroy_all!
  all.each do |job|
    job.destroy
  end
  logger.info { "Cron Jobs - deleted all jobs" }
end

.destroy_removed_jobs(new_job_names) ⇒ Object

remove “removed jobs” between current jobs and new jobs



519
520
521
522
523
524
# File 'lib/sidekiq/cron/job.rb', line 519

def self.destroy_removed_jobs new_job_names
  current_job_names = Sidekiq::Cron::Job.all.map(&:name)
  removed_job_names = current_job_names - new_job_names
  removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
  removed_job_names
end

.exists?(name) ⇒ Boolean

Returns:

  • (Boolean)


540
541
542
543
544
545
546
# File 'lib/sidekiq/cron/job.rb', line 540

def self.exists? name
  out = false
  Sidekiq.redis do |conn|
    out = conn.exists redis_key name
  end
  out
end

.find(name) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/sidekiq/cron/job.rb', line 226

def self.find name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  output = nil
  Sidekiq.redis do |conn|
    if exists? name
      output = Job.new conn.hgetall( redis_key(name) )
    end
  end
  output
end

.load_from_array(array) ⇒ Object

load cron jobs from Array input structure should look like: [

{
  'name'        => 'name_of_job',
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
{
  'name'  => 'Cool Job for Second Class',
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

]



184
185
186
187
188
189
190
191
# File 'lib/sidekiq/cron/job.rb', line 184

def self.load_from_array array
  errors = {}
  array.each do |job_data|
    job = new(job_data)
    errors[job.name] = job.errors unless job.save
  end
  errors
end

.load_from_array!(array) ⇒ Object

like to #load_from_array If exists old jobs in redis but removed from args, destroy old jobs



195
196
197
198
199
# File 'lib/sidekiq/cron/job.rb', line 195

def self.load_from_array! array
  job_names = array.map { |job| job["name"] }
  destroy_removed_jobs(job_names)
  load_from_array(array)
end

.load_from_hash(hash) ⇒ Object

load cron jobs from Hash input structure should look like: {

'name_of_job' => {
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
'My super iber cool job' => {
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

}



152
153
154
155
156
157
158
# File 'lib/sidekiq/cron/job.rb', line 152

def self.load_from_hash hash
  array = hash.inject([]) do |out,(key, job)|
    job['name'] = key
    out << job
  end
  load_from_array array
end

.load_from_hash!(hash) ⇒ Object

like to #load_from_hash If exists old jobs in redis but removed from args, destroy old jobs



162
163
164
165
# File 'lib/sidekiq/cron/job.rb', line 162

def self.load_from_hash! hash
  destroy_removed_jobs(hash.keys)
  load_from_hash(hash)
end

Instance Method Details

#active_job_messageObject

active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job



122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/sidekiq/cron/job.rb', line 122

def active_job_message
  {
    'class'        => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
    'wrapped'      => @klass,
    'queue'        => @queue_name_with_prefix,
    'description'  => @description,
    'args'         => [{
      'job_class'  => @klass,
      'job_id'     => SecureRandom.uuid,
      'queue_name' => @queue_name_with_prefix,
      'arguments'  => @args
    }]
  }
end

#add_jid_history(jid) ⇒ Object



476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/sidekiq/cron/job.rb', line 476

def add_jid_history(jid)
  jid_history = {
    jid: jid,
    enqueued: @last_enqueue_time
  }
  @history_size ||= (Sidekiq.options[:cron_history_size] || 10).to_i - 1
  Sidekiq.redis do |conn|
    conn.lpush jid_history_key,
               Sidekiq.dump_json(jid_history)
    # keep only last 10 entries in a fifo manner
    conn.ltrim jid_history_key, 0, @history_size
  end
end

#destroyObject

remove job from cron jobs by name input:

first arg: name (string) - name of job (must be same - case sensitive)


493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/sidekiq/cron/job.rb', line 493

def destroy
  Sidekiq.redis do |conn|
    #delete from set
    conn.srem self.class.jobs_key, redis_key

    #delete runned timestamps
    conn.del job_enqueued_key

    # delete jid_history
    conn.del jid_history_key

    #delete main job
    conn.del redis_key
  end
  logger.info { "Cron Jobs - deleted job with name: #{@name}" }
end

#disable!Object



333
334
335
336
# File 'lib/sidekiq/cron/job.rb', line 333

def disable!
  @status = "disabled"
  save
end

#disabled?Boolean

Returns:

  • (Boolean)


347
348
349
# File 'lib/sidekiq/cron/job.rb', line 347

def disabled?
  !enabled?
end

#enable!Object



338
339
340
341
# File 'lib/sidekiq/cron/job.rb', line 338

def enable!
  @status = "enabled"
  save
end

#enabled?Boolean

Returns:

  • (Boolean)


343
344
345
# File 'lib/sidekiq/cron/job.rb', line 343

def enabled?
  @status == "enabled"
end

#enque!(time = Time.now.utc) ⇒ Object

enque cron job to queue



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/sidekiq/cron/job.rb', line 49

def enque! time = Time.now.utc
  @last_enqueue_time = time.strftime(LAST_ENQUEUE_TIME_FORMAT)

  klass_const =
      begin
        Sidekiq::Cron::Support.constantize(@klass.to_s)
      rescue NameError
        nil
      end

  jid =
    if klass_const
      if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base
        enqueue_active_job(klass_const).try :provider_job_id
      else
        enqueue_sidekiq_worker(klass_const)
      end
    else
      if @active_job
        Sidekiq::Client.push(active_job_message)
      else
        Sidekiq::Client.push(sidekiq_worker_message)
      end
    end

  save_last_enqueue_time
  add_jid_history jid
  logger.debug { "enqueued #{@name}: #{@message}" }
end

#enqueue_active_job(klass_const) ⇒ Object



85
86
87
# File 'lib/sidekiq/cron/job.rb', line 85

def enqueue_active_job(klass_const)
  klass_const.set(queue: @queue).perform_later(*@args)
end

#enqueue_sidekiq_worker(klass_const) ⇒ Object



89
90
91
# File 'lib/sidekiq/cron/job.rb', line 89

def enqueue_sidekiq_worker(klass_const)
  klass_const.set(queue: queue_name_with_prefix).perform_async(*@args)
end

#errorsObject



407
408
409
# File 'lib/sidekiq/cron/job.rb', line 407

def errors
  @errors ||= []
end

#exists?Boolean

Returns:

  • (Boolean)


548
549
550
# File 'lib/sidekiq/cron/job.rb', line 548

def exists?
  self.class.exists? @name
end

#formated_enqueue_time(now = Time.now.utc) ⇒ Object



532
533
534
# File 'lib/sidekiq/cron/job.rb', line 532

def formated_enqueue_time now = Time.now.utc
  last_time(now).getutc.to_f.to_s
end

#formated_last_time(now = Time.now.utc) ⇒ Object



536
537
538
# File 'lib/sidekiq/cron/job.rb', line 536

def formated_last_time now = Time.now.utc
  last_time(now).getutc.iso8601
end

#is_active_job?Boolean

Returns:

  • (Boolean)


79
80
81
82
83
# File 'lib/sidekiq/cron/job.rb', line 79

def is_active_job?
  @active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base
rescue NameError
  false
end

#jid_history_from_redisObject



378
379
380
381
382
383
384
385
386
387
388
# File 'lib/sidekiq/cron/job.rb', line 378

def jid_history_from_redis
  out =
    Sidekiq.redis do |conn|
      conn.lrange(jid_history_key, 0, -1) rescue nil
    end

  # returns nil if out nil
  out && out.map do |jid_history_raw|
    Sidekiq.load_json jid_history_raw
  end
end

#klass_validObject



431
432
433
434
435
436
437
438
439
# File 'lib/sidekiq/cron/job.rb', line 431

def klass_valid
  case @klass
    when Class
      true
    when String
      @klass.size > 0
    else
  end
end

#last_enqueue_time_from_redisObject



368
369
370
371
372
373
374
375
376
# File 'lib/sidekiq/cron/job.rb', line 368

def last_enqueue_time_from_redis
  out = nil
  if fetch_missing_args
    Sidekiq.redis do |conn|
      out = parse_enqueue_time(conn.hget(redis_key, "last_enqueue_time")) rescue nil
    end
  end
  out
end

#last_time(now = Time.now.utc) ⇒ Object

Parse cron specification ‘* * * * *’ and returns time when last run should be performed



528
529
530
# File 'lib/sidekiq/cron/job.rb', line 528

def last_time now = Time.now.utc
  parsed_cron.previous_time(now.utc).utc
end

#pretty_messageObject



351
352
353
354
355
# File 'lib/sidekiq/cron/job.rb', line 351

def pretty_message
  JSON.pretty_generate Sidekiq.load_json(message)
rescue JSON::ParserError
  message
end

#queue_name_with_prefixObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/sidekiq/cron/job.rb', line 98

def queue_name_with_prefix
  return @queue unless is_active_job?

  if !"#{@active_job_queue_name_delimiter}".empty?
    queue_name_delimiter = @active_job_queue_name_delimiter
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty?
    queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
  else
    queue_name_delimiter = '_'
  end

  if !"#{@active_job_queue_name_prefix}".empty?
    queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
    queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  else
    queue_name = @queue
  end

  queue_name
end

#remove_previous_enques(time) ⇒ Object

remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory



32
33
34
35
36
# File 'lib/sidekiq/cron/job.rb', line 32

def remove_previous_enques time
  Sidekiq.redis do |conn|
    conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
  end
end

#saveObject

add job to cron jobs input:

name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform

optional input:

queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method


450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/sidekiq/cron/job.rb', line 450

def save
  #if job is invalid return false
  return false unless valid?

  Sidekiq.redis do |conn|

    #add to set of all jobs
    conn.sadd self.class.jobs_key, redis_key

    #add informations for this job!
    conn.hmset redis_key, *hash_to_redis(to_hash)

    #add information about last time! - don't enque right after scheduler poller starts!
    time = Time.now.utc
    conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key)
  end
  logger.info { "Cron Jobs - add job with name: #{@name}" }
end

#save_last_enqueue_timeObject



469
470
471
472
473
474
# File 'lib/sidekiq/cron/job.rb', line 469

def save_last_enqueue_time
  Sidekiq.redis do |conn|
    # update last enqueue time
    conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time
  end
end

#should_enque?(time) ⇒ Boolean

crucial part of whole enquing job

Returns:

  • (Boolean)


18
19
20
21
22
23
24
25
26
27
# File 'lib/sidekiq/cron/job.rb', line 18

def should_enque? time
  enqueue = false
  enqueue = Sidekiq.redis do |conn|
    status == "enabled" &&
      not_past_scheduled_time?(time) &&
      not_enqueued_after?(time) &&
      conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
  end
  enqueue
end

#sidekiq_worker_messageObject

siodekiq worker message



94
95
96
# File 'lib/sidekiq/cron/job.rb', line 94

def sidekiq_worker_message
  @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
end

#sort_nameObject



552
553
554
# File 'lib/sidekiq/cron/job.rb', line 552

def sort_name
  "#{status == "enabled" ? 0 : 1}_#{name}".downcase
end

#statusObject



329
330
331
# File 'lib/sidekiq/cron/job.rb', line 329

def status
  @status
end

#status_from_redisObject



357
358
359
360
361
362
363
364
365
366
# File 'lib/sidekiq/cron/job.rb', line 357

def status_from_redis
  out = "enabled"
  if fetch_missing_args
    Sidekiq.redis do |conn|
      status = conn.hget redis_key, "status"
      out = status if status
    end
  end
  out
end

#test_and_enque_for_time!(time) ⇒ Object

test if job should be enqued If yes add it to queue



39
40
41
42
43
44
45
46
# File 'lib/sidekiq/cron/job.rb', line 39

def test_and_enque_for_time! time
  #should this job be enqued?
  if should_enque?(time)
    enque!

    remove_previous_enques(time)
  end
end

#to_hashObject

export job data to hash



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# File 'lib/sidekiq/cron/job.rb', line 391

def to_hash
  {
    name: @name,
    klass: @klass,
    cron: @cron,
    description: @description,
    args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
    message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
    status: @status,
    active_job: @active_job,
    queue_name_prefix: @active_job_queue_name_prefix,
    queue_name_delimiter: @active_job_queue_name_delimiter,
    last_enqueue_time: @last_enqueue_time,
  }
end

#valid?Boolean

Returns:

  • (Boolean)


411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/sidekiq/cron/job.rb', line 411

def valid?
  #clear previous errors
  @errors = []

  errors << "'name' must be set" if @name.nil? || @name.size == 0
  if @cron.nil? || @cron.size == 0
    errors << "'cron' must be set"
  else
    begin
      @parsed_cron = Fugit.do_parse_cron(@cron)
    rescue => e
      errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
    end
  end

  errors << "'klass' (or class) must be set" unless klass_valid

  errors.empty?
end