Class: Sidekiq::Cron::Job

Inherits:
Object
  • Object
show all
Extended by:
Util
Includes:
Util
Defined in:
lib/sidekiq/cron/job.rb

Constant Summary collapse

REMEMBER_THRESHOLD =

how long we would like to store informations about previous enqueues

24 * 60 * 60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input_args = {}) ⇒ Job

Returns a new instance of Job.



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/sidekiq/cron/job.rb', line 258

def initialize input_args = {}
  args = input_args.stringify_keys
  @fetch_missing_args = args.delete('fetch_missing_args')
  @fetch_missing_args = true if @fetch_missing_args.nil?

  @name = args["name"]
  @cron = args["cron"]
  @description = args["description"] if args["description"]

  #get class from klass or class
  @klass = args["klass"] || args["class"]

  #set status of job
  @status = args['status'] || status_from_redis

  #set last enqueue time - from args or from existing job
  if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
    @last_enqueue_time = Time.parse(args['last_enqueue_time'])
  else
    @last_enqueue_time = last_enqueue_time_from_redis
  end

  #get right arguments for job
  @args = args["args"].nil? ? [] : parse_args( args["args"] )

  @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
  @active_job_queue_name_prefix = args["queue_name_prefix"]
  @active_job_queue_name_delimiter = args["queue_name_delimiter"]

  if args["message"]
    @message = args["message"]
    message_data = Sidekiq.load_json(@message) || {}
    @queue = message_data['queue'] || default
  elsif @klass
    message_data = {
      "class" => @klass.to_s,
      "args"  => @args,
    }

    #get right data for message
    #only if message wasn't specified before
    message_data = case @klass
      when Class
        @klass.get_sidekiq_options.merge(message_data)
      when String
        begin
          @klass.constantize.get_sidekiq_options.merge(message_data)
        rescue
          #Unknown class
          message_data.merge("queue"=>"default")
        end

    end

    #override queue if setted in config
    #only if message is hash - can be string (dumped JSON)
    if args['queue']
      @queue = message_data['queue'] = args['queue']
    else
      @queue = message_data['queue'] || default
    end

    #dump message as json
    @message = message_data
  end

  @queue_name_with_prefix = queue_name_with_prefix
end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



255
256
257
# File 'lib/sidekiq/cron/job.rb', line 255

def args
  @args
end

#cronObject

Returns the value of attribute cron.



255
256
257
# File 'lib/sidekiq/cron/job.rb', line 255

def cron
  @cron
end

#descriptionObject

Returns the value of attribute description.



255
256
257
# File 'lib/sidekiq/cron/job.rb', line 255

def description
  @description
end

#fetch_missing_argsObject (readonly)

Returns the value of attribute fetch_missing_args.



256
257
258
# File 'lib/sidekiq/cron/job.rb', line 256

def fetch_missing_args
  @fetch_missing_args
end

#klassObject

Returns the value of attribute klass.



255
256
257
# File 'lib/sidekiq/cron/job.rb', line 255

def klass
  @klass
end

#last_enqueue_timeObject (readonly)

Returns the value of attribute last_enqueue_time.



256
257
258
# File 'lib/sidekiq/cron/job.rb', line 256

def last_enqueue_time
  @last_enqueue_time
end

#messageObject

Returns the value of attribute message.



255
256
257
# File 'lib/sidekiq/cron/job.rb', line 255

def message
  @message
end

#nameObject

Returns the value of attribute name.



255
256
257
# File 'lib/sidekiq/cron/job.rb', line 255

def name
  @name
end

Class Method Details

.allObject

get all cron jobs



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/sidekiq/cron/job.rb', line 201

def self.all
  job_hashes = nil
  Sidekiq.redis do |conn|
    set_members = conn.smembers(jobs_key)
    job_hashes = conn.pipelined do
      set_members.each do |key|
        conn.hgetall(key)
      end
    end
  end
  job_hashes.compact.reject(&:empty?).collect do |h|
    # no need to fetch missing args from redis since we just got this hash from there
    Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
  end
end

.countObject



217
218
219
220
221
222
223
# File 'lib/sidekiq/cron/job.rb', line 217

def self.count
  out = 0
  Sidekiq.redis do |conn|
    out = conn.scard(jobs_key)
  end
  out
end

.create(hash) ⇒ Object

create new instance of cron job



239
240
241
# File 'lib/sidekiq/cron/job.rb', line 239

def self.create hash
  new(hash).save
end

.destroy(name) ⇒ Object

destroy job by name



244
245
246
247
248
249
250
251
252
253
# File 'lib/sidekiq/cron/job.rb', line 244

def self.destroy name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  if job = find(name)
    job.destroy
  else
    false
  end
end

.destroy_all!Object

remove all job from cron



473
474
475
476
477
478
# File 'lib/sidekiq/cron/job.rb', line 473

def self.destroy_all!
  all.each do |job|
    job.destroy
  end
  logger.info { "Cron Jobs - deleted all jobs" }
end

.destroy_removed_jobs(new_job_names) ⇒ Object

remove “removed jobs” between current jobs and new jobs



481
482
483
484
485
486
# File 'lib/sidekiq/cron/job.rb', line 481

def self.destroy_removed_jobs new_job_names
  current_job_names = Sidekiq::Cron::Job.all.map(&:name)
  removed_job_names = current_job_names - new_job_names
  removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
  removed_job_names
end

.exists?(name) ⇒ Boolean

Returns:

  • (Boolean)


502
503
504
505
506
507
508
# File 'lib/sidekiq/cron/job.rb', line 502

def self.exists? name
  out = false
  Sidekiq.redis do |conn|
    out = conn.exists redis_key name
  end
  out
end

.find(name) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/sidekiq/cron/job.rb', line 225

def self.find name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  output = nil
  Sidekiq.redis do |conn|
    if exists? name
      output = Job.new conn.hgetall( redis_key(name) )
    end
  end
  output
end

.load_from_array(array) ⇒ Object

load cron jobs from Array input structure should look like: [

{
  'name'        => 'name_of_job',
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
{
  'name'  => 'Cool Job for Second Class',
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

]



183
184
185
186
187
188
189
190
# File 'lib/sidekiq/cron/job.rb', line 183

def self.load_from_array array
  errors = {}
  array.each do |job_data|
    job = new(job_data)
    errors[job.name] = job.errors unless job.save
  end
  errors
end

.load_from_array!(array) ⇒ Object

like to #load_from_array If exists old jobs in redis but removed from args, destroy old jobs



194
195
196
197
198
# File 'lib/sidekiq/cron/job.rb', line 194

def self.load_from_array! array
  job_names = array.map { |job| job["name"] }
  destroy_removed_jobs(job_names)
  load_from_array(array)
end

.load_from_hash(hash) ⇒ Object

load cron jobs from Hash input structure should look like: {

'name_of_job' => {
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
'My super iber cool job' => {
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

}



151
152
153
154
155
156
157
# File 'lib/sidekiq/cron/job.rb', line 151

def self.load_from_hash hash
  array = hash.inject([]) do |out,(key, job)|
    job['name'] = key
    out << job
  end
  load_from_array array
end

.load_from_hash!(hash) ⇒ Object

like to #load_from_hash If exists old jobs in redis but removed from args, destroy old jobs



161
162
163
164
# File 'lib/sidekiq/cron/job.rb', line 161

def self.load_from_hash! hash
  destroy_removed_jobs(hash.keys)
  load_from_hash(hash)
end

Instance Method Details

#active_job_messageObject

active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job



122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/sidekiq/cron/job.rb', line 122

def active_job_message
  {
    'class'        => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
    'queue'        => @queue_name_with_prefix,
    'description'  => @description,
    'args'         => [{
      'job_class'  => @klass,
      'job_id'     => SecureRandom.uuid,
      'queue_name' => @queue_name_with_prefix,
      'arguments'  => @args
    }]
  }
end

#destroyObject

remove job from cron jobs by name input:

first arg: name (string) - name of job (must be same - case sensitive)


458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/sidekiq/cron/job.rb', line 458

def destroy
  Sidekiq.redis do |conn|
    #delete from set
    conn.srem self.class.jobs_key, redis_key

    #delete runned timestamps
    conn.del job_enqueued_key

    #delete main job
    conn.del redis_key
  end
  logger.info { "Cron Jobs - deleted job with name: #{@name}" }
end

#disable!Object



331
332
333
334
# File 'lib/sidekiq/cron/job.rb', line 331

def disable!
  @status = "disabled"
  save
end

#disabled?Boolean

Returns:

  • (Boolean)


345
346
347
# File 'lib/sidekiq/cron/job.rb', line 345

def disabled?
  !enabled?
end

#enable!Object



336
337
338
339
# File 'lib/sidekiq/cron/job.rb', line 336

def enable!
  @status = "enabled"
  save
end

#enabled?Boolean

Returns:

  • (Boolean)


341
342
343
# File 'lib/sidekiq/cron/job.rb', line 341

def enabled?
  @status == "enabled"
end

#enque!(time = Time.now) ⇒ Object

enque cron job to queue



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/sidekiq/cron/job.rb', line 47

def enque! time = Time.now
  @last_enqueue_time = time

  klass_const =
      begin
        @klass.to_s.constantize
      rescue NameError
        nil
      end

  if klass_const
    if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base
      enqueue_active_job(klass_const)
    else
      enqueue_sidekiq_worker(klass_const)
    end
  else
    if @active_job
      Sidekiq::Client.push(active_job_message)
    else
      Sidekiq::Client.push(sidekiq_worker_message)
    end
  end

  save
  logger.debug { "enqueued #{@name}: #{@message}" }
end

#enqueue_active_job(klass_const) ⇒ Object



81
82
83
84
85
# File 'lib/sidekiq/cron/job.rb', line 81

def enqueue_active_job(klass_const)
  klass_const.set(queue: @queue_name_with_prefix).perform_later(*@args)

  true
end

#enqueue_sidekiq_worker(klass_const) ⇒ Object



87
88
89
90
91
# File 'lib/sidekiq/cron/job.rb', line 87

def enqueue_sidekiq_worker(klass_const)
  klass_const.set(queue: @queue_name_with_prefix).perform_async(*@args)

  true
end

#errorsObject



387
388
389
# File 'lib/sidekiq/cron/job.rb', line 387

def errors
  @errors ||= []
end

#exists?Boolean

Returns:

  • (Boolean)


510
511
512
# File 'lib/sidekiq/cron/job.rb', line 510

def exists?
  self.class.exists? @name
end

#formated_enqueue_time(now = Time.now) ⇒ Object



494
495
496
# File 'lib/sidekiq/cron/job.rb', line 494

def formated_enqueue_time now = Time.now
  last_time(now).getutc.to_f.to_s
end

#formated_last_time(now = Time.now) ⇒ Object



498
499
500
# File 'lib/sidekiq/cron/job.rb', line 498

def formated_last_time now = Time.now
  last_time(now).getutc.iso8601
end

#is_active_job?Boolean

Returns:

  • (Boolean)


75
76
77
78
79
# File 'lib/sidekiq/cron/job.rb', line 75

def is_active_job?
  @active_job || defined?(ActiveJob::Base) && @klass.to_s.constantize < ActiveJob::Base
rescue NameError
  false
end

#klass_validObject



417
418
419
420
421
422
423
424
425
# File 'lib/sidekiq/cron/job.rb', line 417

def klass_valid
  case @klass
    when Class
      true
    when String
      @klass.size > 0
    else
  end
end

#last_enqueue_time_from_redisObject



360
361
362
363
364
365
366
367
368
# File 'lib/sidekiq/cron/job.rb', line 360

def last_enqueue_time_from_redis
  out = nil
  if fetch_missing_args
    Sidekiq.redis do |conn|
      out = Time.parse(conn.hget(redis_key, "last_enqueue_time")) rescue nil
    end
  end
  out
end

#last_time(now = Time.now) ⇒ Object

Parse cron specification ‘* * * * *’ and returns time when last run should be performed



490
491
492
# File 'lib/sidekiq/cron/job.rb', line 490

def last_time now = Time.now
  Rufus::Scheduler::CronLine.new(@cron).previous_time(now)
end

#queue_name_with_prefixObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/sidekiq/cron/job.rb', line 98

def queue_name_with_prefix
  return @queue unless is_active_job?

  if !"#{@active_job_queue_name_delimiter}".empty?
    queue_name_delimiter = @active_job_queue_name_delimiter
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty?
    queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
  else
    queue_name_delimiter = '_'
  end

  if !"#{@active_job_queue_name_prefix}".empty?
    queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
    queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  else
    queue_name = @queue
  end

  queue_name
end

#remove_previous_enques(time) ⇒ Object

remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory



30
31
32
33
34
# File 'lib/sidekiq/cron/job.rb', line 30

def remove_previous_enques time
  Sidekiq.redis do |conn|
    conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
  end
end

#saveObject

add job to cron jobs input:

name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform

optional input:

queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method


436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# File 'lib/sidekiq/cron/job.rb', line 436

def save
  #if job is invalid return false
  return false unless valid?

  Sidekiq.redis do |conn|

    #add to set of all jobs
    conn.sadd self.class.jobs_key, redis_key

    #add informations for this job!
    conn.hmset redis_key, *hash_to_redis(to_hash)

    #add information about last time! - don't enque right after scheduler poller starts!
    time = Time.now
    conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key)
  end
  logger.info { "Cron Jobs - add job with name: #{@name}" }
end

#should_enque?(time) ⇒ Boolean

crucial part of whole enquing job

Returns:

  • (Boolean)


16
17
18
19
20
21
22
23
24
25
# File 'lib/sidekiq/cron/job.rb', line 16

def should_enque? time
  enqueue = false
  enqueue = Sidekiq.redis do |conn|
    status == "enabled" &&
      not_past_scheduled_time?(time) &&
      not_enqueued_after?(time) &&
      conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
  end
  enqueue
end

#sidekiq_worker_messageObject

siodekiq worker message



94
95
96
# File 'lib/sidekiq/cron/job.rb', line 94

def sidekiq_worker_message
  @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
end

#sort_nameObject



514
515
516
# File 'lib/sidekiq/cron/job.rb', line 514

def sort_name
  "#{status == "enabled" ? 0 : 1}_#{name}".downcase
end

#statusObject



327
328
329
# File 'lib/sidekiq/cron/job.rb', line 327

def status
  @status
end

#status_from_redisObject



349
350
351
352
353
354
355
356
357
358
# File 'lib/sidekiq/cron/job.rb', line 349

def status_from_redis
  out = "enabled"
  if fetch_missing_args
    Sidekiq.redis do |conn|
      status = conn.hget redis_key, "status"
      out = status if status
    end
  end
  out
end

#test_and_enque_for_time!(time) ⇒ Object

test if job should be enqued If yes add it to queue



37
38
39
40
41
42
43
44
# File 'lib/sidekiq/cron/job.rb', line 37

def test_and_enque_for_time! time
  #should this job be enqued?
  if should_enque?(time)
    enque!

    remove_previous_enques(time)
  end
end

#to_hashObject

export job data to hash



371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/sidekiq/cron/job.rb', line 371

def to_hash
  {
    name: @name,
    klass: @klass,
    cron: @cron,
    description: @description,
    args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
    message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
    status: @status,
    active_job: @active_job,
    queue_name_prefix: @active_job_queue_name_prefix,
    queue_name_delimiter: @active_job_queue_name_delimiter,
    last_enqueue_time: @last_enqueue_time,
  }
end

#valid?Boolean

Returns:

  • (Boolean)


391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/sidekiq/cron/job.rb', line 391

def valid?
  #clear previos errors
  @errors = []

  errors << "'name' must be set" if @name.nil? || @name.size == 0
  if @cron.nil? || @cron.size == 0
    errors << "'cron' must be set"
  else
    begin
      cron = Rufus::Scheduler::CronLine.new(@cron)
      cron.next_time(Time.now)
    rescue Exception => e
      #fix for different versions of cron-parser
      if e.message == "Bad Vixie-style specification bad"
        errors << "'cron' -> #{@cron}: not a valid cronline"
      else
        errors << "'cron' -> #{@cron}: #{e.message}"
      end
    end
  end

  errors << "'klass' (or class) must be set" unless klass_valid

  !errors.any?
end