Class: Sidekiq::Cron::Job
- Inherits:
-
Object
- Object
- Sidekiq::Cron::Job
- Extended by:
- Util
- Includes:
- Util
- Defined in:
- lib/sidekiq/cron/job.rb
Constant Summary collapse
- REMEMBER_THRESHOLD =
how long we would like to store informations about previous enqueues
24 * 60 * 60
Instance Attribute Summary collapse
-
#args ⇒ Object
Returns the value of attribute args.
-
#cron ⇒ Object
Returns the value of attribute cron.
-
#description ⇒ Object
Returns the value of attribute description.
-
#fetch_missing_args ⇒ Object
readonly
Returns the value of attribute fetch_missing_args.
-
#klass ⇒ Object
Returns the value of attribute klass.
-
#last_enqueue_time ⇒ Object
readonly
Returns the value of attribute last_enqueue_time.
-
#message ⇒ Object
Returns the value of attribute message.
-
#name ⇒ Object
Returns the value of attribute name.
Class Method Summary collapse
-
.all ⇒ Object
get all cron jobs.
- .count ⇒ Object
-
.create(hash) ⇒ Object
create new instance of cron job.
-
.destroy(name) ⇒ Object
destroy job by name.
-
.destroy_all! ⇒ Object
remove all job from cron.
-
.destroy_removed_jobs(new_job_names) ⇒ Object
remove “removed jobs” between current jobs and new jobs.
- .exists?(name) ⇒ Boolean
- .find(name) ⇒ Object
-
.load_from_array(array) ⇒ Object
load cron jobs from Array input structure should look like: [ { ‘name’ => ‘name_of_job’, ‘class’ => ‘MyClass’, ‘cron’ => ‘1 * * * *’, ‘args’ => ‘(OPTIONAL) [Array or Hash]’, ‘description’ => ‘(OPTIONAL) Description of job’ }, { ‘name’ => ‘Cool Job for Second Class’, ‘class’ => ‘SecondClass’, ‘cron’ => ‘*/5 * * * *’ } ].
-
.load_from_array!(array) ⇒ Object
like to #load_from_array If exists old jobs in redis but removed from args, destroy old jobs.
-
.load_from_hash(hash) ⇒ Object
load cron jobs from Hash input structure should look like: { ‘name_of_job’ => { ‘class’ => ‘MyClass’, ‘cron’ => ‘1 * * * *’, ‘args’ => ‘(OPTIONAL) [Array or Hash]’, ‘description’ => ‘(OPTIONAL) Description of job’ }, ‘My super iber cool job’ => { ‘class’ => ‘SecondClass’, ‘cron’ => ‘*/5 * * * *’ } }.
-
.load_from_hash!(hash) ⇒ Object
like to #load_from_hash If exists old jobs in redis but removed from args, destroy old jobs.
Instance Method Summary collapse
-
#active_job_message ⇒ Object
active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job.
-
#destroy ⇒ Object
remove job from cron jobs by name input: first arg: name (string) - name of job (must be same - case sensitive).
- #disable! ⇒ Object
- #disabled? ⇒ Boolean
- #enable! ⇒ Object
- #enabled? ⇒ Boolean
-
#enque!(time = Time.now) ⇒ Object
enque cron job to queue.
- #enqueue_active_job(klass_const) ⇒ Object
- #enqueue_sidekiq_worker(klass_const) ⇒ Object
- #errors ⇒ Object
- #exists? ⇒ Boolean
- #formated_enqueue_time(now = Time.now) ⇒ Object
- #formated_last_time(now = Time.now) ⇒ Object
-
#initialize(input_args = {}) ⇒ Job
constructor
A new instance of Job.
- #is_active_job? ⇒ Boolean
- #klass_valid ⇒ Object
- #last_enqueue_time_from_redis ⇒ Object
-
#last_time(now = Time.now) ⇒ Object
Parse cron specification ‘* * * * *’ and returns time when last run should be performed.
- #queue_name_with_prefix ⇒ Object
-
#remove_previous_enques(time) ⇒ Object
remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory.
-
#save ⇒ Object
add job to cron jobs input: name: (string) - name of job cron: (string: ‘* * * * *’ - cron specification when to run job class: (string|class) - which class to perform optional input: queue: (string) - which queue to use for enquing (will override class queue) args: (array|hash|nil) - arguments for permorm method.
-
#should_enque?(time) ⇒ Boolean
crucial part of whole enquing job.
-
#sidekiq_worker_message ⇒ Object
siodekiq worker message.
- #sort_name ⇒ Object
- #status ⇒ Object
- #status_from_redis ⇒ Object
-
#test_and_enque_for_time!(time) ⇒ Object
test if job should be enqued If yes add it to queue.
-
#to_hash ⇒ Object
export job data to hash.
- #valid? ⇒ Boolean
Constructor Details
#initialize(input_args = {}) ⇒ Job
Returns a new instance of Job.
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/sidekiq/cron/job.rb', line 258 def initialize input_args = {} args = input_args.stringify_keys @fetch_missing_args = args.delete('fetch_missing_args') @fetch_missing_args = true if @fetch_missing_args.nil? @name = args["name"] @cron = args["cron"] @description = args["description"] if args["description"] #get class from klass or class @klass = args["klass"] || args["class"] #set status of job @status = args['status'] || status_from_redis #set last enqueue time - from args or from existing job if args['last_enqueue_time'] && !args['last_enqueue_time'].empty? @last_enqueue_time = Time.parse(args['last_enqueue_time']) else @last_enqueue_time = last_enqueue_time_from_redis end #get right arguments for job @args = args["args"].nil? ? [] : parse_args( args["args"] ) @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false @active_job_queue_name_prefix = args["queue_name_prefix"] @active_job_queue_name_delimiter = args["queue_name_delimiter"] if args["message"] = args["message"] = Sidekiq.load_json() || {} @queue = ['queue'] || default elsif @klass = { "class" => @klass.to_s, "args" => @args, } #get right data for message #only if message wasn't specified before = case @klass when Class @klass..merge() when String begin @klass.constantize..merge() rescue #Unknown class .merge("queue"=>"default") end end #override queue if setted in config #only if message is hash - can be string (dumped JSON) if args['queue'] @queue = ['queue'] = args['queue'] else @queue = ['queue'] || default end #dump message as json = end @queue_name_with_prefix = queue_name_with_prefix end |
Instance Attribute Details
#args ⇒ Object
Returns the value of attribute args.
255 256 257 |
# File 'lib/sidekiq/cron/job.rb', line 255 def args @args end |
#cron ⇒ Object
Returns the value of attribute cron.
255 256 257 |
# File 'lib/sidekiq/cron/job.rb', line 255 def cron @cron end |
#description ⇒ Object
Returns the value of attribute description.
255 256 257 |
# File 'lib/sidekiq/cron/job.rb', line 255 def description @description end |
#fetch_missing_args ⇒ Object (readonly)
Returns the value of attribute fetch_missing_args.
256 257 258 |
# File 'lib/sidekiq/cron/job.rb', line 256 def fetch_missing_args @fetch_missing_args end |
#klass ⇒ Object
Returns the value of attribute klass.
255 256 257 |
# File 'lib/sidekiq/cron/job.rb', line 255 def klass @klass end |
#last_enqueue_time ⇒ Object (readonly)
Returns the value of attribute last_enqueue_time.
256 257 258 |
# File 'lib/sidekiq/cron/job.rb', line 256 def last_enqueue_time @last_enqueue_time end |
#message ⇒ Object
Returns the value of attribute message.
255 256 257 |
# File 'lib/sidekiq/cron/job.rb', line 255 def end |
#name ⇒ Object
Returns the value of attribute name.
255 256 257 |
# File 'lib/sidekiq/cron/job.rb', line 255 def name @name end |
Class Method Details
.all ⇒ Object
get all cron jobs
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/sidekiq/cron/job.rb', line 201 def self.all job_hashes = nil Sidekiq.redis do |conn| set_members = conn.smembers(jobs_key) job_hashes = conn.pipelined do set_members.each do |key| conn.hgetall(key) end end end job_hashes.compact.reject(&:empty?).collect do |h| # no need to fetch missing args from redis since we just got this hash from there Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false)) end end |
.count ⇒ Object
217 218 219 220 221 222 223 |
# File 'lib/sidekiq/cron/job.rb', line 217 def self.count out = 0 Sidekiq.redis do |conn| out = conn.scard(jobs_key) end out end |
.create(hash) ⇒ Object
create new instance of cron job
239 240 241 |
# File 'lib/sidekiq/cron/job.rb', line 239 def self.create hash new(hash).save end |
.destroy(name) ⇒ Object
destroy job by name
244 245 246 247 248 249 250 251 252 253 |
# File 'lib/sidekiq/cron/job.rb', line 244 def self.destroy name #if name is hash try to get name from it name = name[:name] || name['name'] if name.is_a?(Hash) if job = find(name) job.destroy else false end end |
.destroy_all! ⇒ Object
remove all job from cron
473 474 475 476 477 478 |
# File 'lib/sidekiq/cron/job.rb', line 473 def self.destroy_all! all.each do |job| job.destroy end logger.info { "Cron Jobs - deleted all jobs" } end |
.destroy_removed_jobs(new_job_names) ⇒ Object
remove “removed jobs” between current jobs and new jobs
481 482 483 484 485 486 |
# File 'lib/sidekiq/cron/job.rb', line 481 def self.destroy_removed_jobs new_job_names current_job_names = Sidekiq::Cron::Job.all.map(&:name) removed_job_names = current_job_names - new_job_names removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) } removed_job_names end |
.exists?(name) ⇒ Boolean
502 503 504 505 506 507 508 |
# File 'lib/sidekiq/cron/job.rb', line 502 def self.exists? name out = false Sidekiq.redis do |conn| out = conn.exists redis_key name end out end |
.find(name) ⇒ Object
225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/sidekiq/cron/job.rb', line 225 def self.find name #if name is hash try to get name from it name = name[:name] || name['name'] if name.is_a?(Hash) output = nil Sidekiq.redis do |conn| if exists? name output = Job.new conn.hgetall( redis_key(name) ) end end output end |
.load_from_array(array) ⇒ Object
load cron jobs from Array input structure should look like: [
{
'name' => 'name_of_job',
'class' => 'MyClass',
'cron' => '1 * * * *',
'args' => '(OPTIONAL) [Array or Hash]',
'description' => '(OPTIONAL) Description of job'
},
{
'name' => 'Cool Job for Second Class',
'class' => 'SecondClass',
'cron' => '*/5 * * * *'
}
]
183 184 185 186 187 188 189 190 |
# File 'lib/sidekiq/cron/job.rb', line 183 def self.load_from_array array errors = {} array.each do |job_data| job = new(job_data) errors[job.name] = job.errors unless job.save end errors end |
.load_from_array!(array) ⇒ Object
like to #load_from_array If exists old jobs in redis but removed from args, destroy old jobs
194 195 196 197 198 |
# File 'lib/sidekiq/cron/job.rb', line 194 def self.load_from_array! array job_names = array.map { |job| job["name"] } destroy_removed_jobs(job_names) load_from_array(array) end |
.load_from_hash(hash) ⇒ Object
load cron jobs from Hash input structure should look like: {
'name_of_job' => {
'class' => 'MyClass',
'cron' => '1 * * * *',
'args' => '(OPTIONAL) [Array or Hash]',
'description' => '(OPTIONAL) Description of job'
},
'My super iber cool job' => {
'class' => 'SecondClass',
'cron' => '*/5 * * * *'
}
}
151 152 153 154 155 156 157 |
# File 'lib/sidekiq/cron/job.rb', line 151 def self.load_from_hash hash array = hash.inject([]) do |out,(key, job)| job['name'] = key out << job end load_from_array array end |
.load_from_hash!(hash) ⇒ Object
like to #load_from_hash If exists old jobs in redis but removed from args, destroy old jobs
161 162 163 164 |
# File 'lib/sidekiq/cron/job.rb', line 161 def self.load_from_hash! hash destroy_removed_jobs(hash.keys) load_from_hash(hash) end |
Instance Method Details
#active_job_message ⇒ Object
active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job
122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/sidekiq/cron/job.rb', line 122 def { 'class' => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper', 'queue' => @queue_name_with_prefix, 'description' => @description, 'args' => [{ 'job_class' => @klass, 'job_id' => SecureRandom.uuid, 'queue_name' => @queue_name_with_prefix, 'arguments' => @args }] } end |
#destroy ⇒ Object
remove job from cron jobs by name input:
first arg: name (string) - name of job (must be same - case sensitive)
458 459 460 461 462 463 464 465 466 467 468 469 470 |
# File 'lib/sidekiq/cron/job.rb', line 458 def destroy Sidekiq.redis do |conn| #delete from set conn.srem self.class.jobs_key, redis_key #delete runned timestamps conn.del job_enqueued_key #delete main job conn.del redis_key end logger.info { "Cron Jobs - deleted job with name: #{@name}" } end |
#disable! ⇒ Object
331 332 333 334 |
# File 'lib/sidekiq/cron/job.rb', line 331 def disable! @status = "disabled" save end |
#disabled? ⇒ Boolean
345 346 347 |
# File 'lib/sidekiq/cron/job.rb', line 345 def disabled? !enabled? end |
#enable! ⇒ Object
336 337 338 339 |
# File 'lib/sidekiq/cron/job.rb', line 336 def enable! @status = "enabled" save end |
#enabled? ⇒ Boolean
341 342 343 |
# File 'lib/sidekiq/cron/job.rb', line 341 def enabled? @status == "enabled" end |
#enque!(time = Time.now) ⇒ Object
enque cron job to queue
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/sidekiq/cron/job.rb', line 47 def enque! time = Time.now @last_enqueue_time = time klass_const = begin @klass.to_s.constantize rescue NameError nil end if klass_const if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base enqueue_active_job(klass_const) else enqueue_sidekiq_worker(klass_const) end else if @active_job Sidekiq::Client.push() else Sidekiq::Client.push() end end save logger.debug { "enqueued #{@name}: #{@message}" } end |
#enqueue_active_job(klass_const) ⇒ Object
81 82 83 84 85 |
# File 'lib/sidekiq/cron/job.rb', line 81 def enqueue_active_job(klass_const) klass_const.set(queue: @queue_name_with_prefix).perform_later(*@args) true end |
#enqueue_sidekiq_worker(klass_const) ⇒ Object
87 88 89 90 91 |
# File 'lib/sidekiq/cron/job.rb', line 87 def enqueue_sidekiq_worker(klass_const) klass_const.set(queue: @queue_name_with_prefix).perform_async(*@args) true end |
#errors ⇒ Object
387 388 389 |
# File 'lib/sidekiq/cron/job.rb', line 387 def errors @errors ||= [] end |
#exists? ⇒ Boolean
510 511 512 |
# File 'lib/sidekiq/cron/job.rb', line 510 def exists? self.class.exists? @name end |
#formated_enqueue_time(now = Time.now) ⇒ Object
494 495 496 |
# File 'lib/sidekiq/cron/job.rb', line 494 def formated_enqueue_time now = Time.now last_time(now).getutc.to_f.to_s end |
#formated_last_time(now = Time.now) ⇒ Object
498 499 500 |
# File 'lib/sidekiq/cron/job.rb', line 498 def formated_last_time now = Time.now last_time(now).getutc.iso8601 end |
#is_active_job? ⇒ Boolean
75 76 77 78 79 |
# File 'lib/sidekiq/cron/job.rb', line 75 def is_active_job? @active_job || defined?(ActiveJob::Base) && @klass.to_s.constantize < ActiveJob::Base rescue NameError false end |
#klass_valid ⇒ Object
417 418 419 420 421 422 423 424 425 |
# File 'lib/sidekiq/cron/job.rb', line 417 def klass_valid case @klass when Class true when String @klass.size > 0 else end end |
#last_enqueue_time_from_redis ⇒ Object
360 361 362 363 364 365 366 367 368 |
# File 'lib/sidekiq/cron/job.rb', line 360 def last_enqueue_time_from_redis out = nil if fetch_missing_args Sidekiq.redis do |conn| out = Time.parse(conn.hget(redis_key, "last_enqueue_time")) rescue nil end end out end |
#last_time(now = Time.now) ⇒ Object
Parse cron specification ‘* * * * *’ and returns time when last run should be performed
490 491 492 |
# File 'lib/sidekiq/cron/job.rb', line 490 def last_time now = Time.now Rufus::Scheduler::CronLine.new(@cron).previous_time(now) end |
#queue_name_with_prefix ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/sidekiq/cron/job.rb', line 98 def queue_name_with_prefix return @queue unless is_active_job? if !"#{@active_job_queue_name_delimiter}".empty? queue_name_delimiter = @active_job_queue_name_delimiter elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty? queue_name_delimiter = ActiveJob::Base.queue_name_delimiter else queue_name_delimiter = '_' end if !"#{@active_job_queue_name_prefix}".empty? queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}" elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty? queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}" else queue_name = @queue end queue_name end |
#remove_previous_enques(time) ⇒ Object
remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory
30 31 32 33 34 |
# File 'lib/sidekiq/cron/job.rb', line 30 def remove_previous_enques time Sidekiq.redis do |conn| conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}") end end |
#save ⇒ Object
add job to cron jobs input:
name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform
optional input:
queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 |
# File 'lib/sidekiq/cron/job.rb', line 436 def save #if job is invalid return false return false unless valid? Sidekiq.redis do |conn| #add to set of all jobs conn.sadd self.class.jobs_key, redis_key #add informations for this job! conn.hmset redis_key, *hash_to_redis(to_hash) #add information about last time! - don't enque right after scheduler poller starts! time = Time.now conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key) end logger.info { "Cron Jobs - add job with name: #{@name}" } end |
#should_enque?(time) ⇒ Boolean
crucial part of whole enquing job
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/sidekiq/cron/job.rb', line 16 def should_enque? time enqueue = false enqueue = Sidekiq.redis do |conn| status == "enabled" && not_past_scheduled_time?(time) && not_enqueued_after?(time) && conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time)) end enqueue end |
#sidekiq_worker_message ⇒ Object
siodekiq worker message
94 95 96 |
# File 'lib/sidekiq/cron/job.rb', line 94 def .is_a?(String) ? Sidekiq.load_json() : end |
#sort_name ⇒ Object
514 515 516 |
# File 'lib/sidekiq/cron/job.rb', line 514 def sort_name "#{status == "enabled" ? 0 : 1}_#{name}".downcase end |
#status ⇒ Object
327 328 329 |
# File 'lib/sidekiq/cron/job.rb', line 327 def status @status end |
#status_from_redis ⇒ Object
349 350 351 352 353 354 355 356 357 358 |
# File 'lib/sidekiq/cron/job.rb', line 349 def status_from_redis out = "enabled" if fetch_missing_args Sidekiq.redis do |conn| status = conn.hget redis_key, "status" out = status if status end end out end |
#test_and_enque_for_time!(time) ⇒ Object
test if job should be enqued If yes add it to queue
37 38 39 40 41 42 43 44 |
# File 'lib/sidekiq/cron/job.rb', line 37 def test_and_enque_for_time! time #should this job be enqued? if should_enque?(time) enque! remove_previous_enques(time) end end |
#to_hash ⇒ Object
export job data to hash
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
# File 'lib/sidekiq/cron/job.rb', line 371 def to_hash { name: @name, klass: @klass, cron: @cron, description: @description, args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []), message: .is_a?(String) ? : Sidekiq.dump_json( || {}), status: @status, active_job: @active_job, queue_name_prefix: @active_job_queue_name_prefix, queue_name_delimiter: @active_job_queue_name_delimiter, last_enqueue_time: @last_enqueue_time, } end |
#valid? ⇒ Boolean
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'lib/sidekiq/cron/job.rb', line 391 def valid? #clear previos errors @errors = [] errors << "'name' must be set" if @name.nil? || @name.size == 0 if @cron.nil? || @cron.size == 0 errors << "'cron' must be set" else begin cron = Rufus::Scheduler::CronLine.new(@cron) cron.next_time(Time.now) rescue Exception => e #fix for different versions of cron-parser if e. == "Bad Vixie-style specification bad" errors << "'cron' -> #{@cron}: not a valid cronline" else errors << "'cron' -> #{@cron}: #{e.message}" end end end errors << "'klass' (or class) must be set" unless klass_valid !errors.any? end |