Class: Sidekiq::Cron::Job
- Inherits:
-
Object
- Object
- Sidekiq::Cron::Job
- Extended by:
- Util
- Includes:
- Util
- Defined in:
- lib/sidekiq/cron/job.rb
Constant Summary collapse
- REMEMBER_THRESHOLD =
how long we would like to store informations about previous enqueues
24 * 60 * 60
Instance Attribute Summary collapse
-
#args ⇒ Object
Returns the value of attribute args.
-
#cron ⇒ Object
Returns the value of attribute cron.
-
#klass ⇒ Object
Returns the value of attribute klass.
-
#last_run_time ⇒ Object
readonly
Returns the value of attribute last_run_time.
-
#message ⇒ Object
Returns the value of attribute message.
-
#name ⇒ Object
Returns the value of attribute name.
Class Method Summary collapse
-
.all ⇒ Object
get all cron jobs.
- .count ⇒ Object
-
.create(hash) ⇒ Object
create new instance of cron job.
-
.destroy(name) ⇒ Object
destroy job by name.
-
.destroy_all! ⇒ Object
remove all job from cron.
- .exists?(name) ⇒ Boolean
- .find(name) ⇒ Object
-
.load_from_array(array) ⇒ Object
load cron jobs from Array input structure should look like: [ { ‘name’ => ‘name_of_job’, ‘class’ => ‘MyClass’, ‘cron’ => ‘1 * * * *’, ‘args’ => ‘(OPTIONAL) [Array or Hash]’ }, { ‘name’ => ‘Cool Job for Second Class’, ‘class’ => ‘SecondClass’, ‘cron’ => ‘*/5 * * * *’ } ].
-
.load_from_hash(hash) ⇒ Object
load cron jobs from Hash input structure should look like: { ‘name_of_job’ => { ‘class’ => ‘MyClass’, ‘cron’ => ‘1 * * * *’, ‘args’ => ‘(OPTIONAL) [Array or Hash]’ }, ‘My super iber cool job’ => { ‘class’ => ‘SecondClass’, ‘cron’ => ‘*/5 * * * *’ } }.
Instance Method Summary collapse
-
#destroy ⇒ Object
remove job from cron jobs by name input: first arg: name (string) - name of job (must be same - case sensitive).
- #disable! ⇒ Object
- #enable! ⇒ Object
-
#enque!(time = Time.now) ⇒ Object
enque cron job to queue.
- #errors ⇒ Object
- #exists? ⇒ Boolean
- #formated_last_time(now = Time.now) ⇒ Object
-
#initialize(input_args = {}) ⇒ Job
constructor
A new instance of Job.
-
#last_time(now = Time.now) ⇒ Object
Parse cron specification ‘* * * * *’ and returns time when last run should be performed.
-
#remove_previous_enques(time) ⇒ Object
remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory.
-
#save ⇒ Object
add job to cron jobs input: name: (string) - name of job cron: (string: ‘* * * * *’ - cron specification when to run job class: (string|class) - which class to perform optional input: queue: (string) - which queue to use for enquing (will override class queue) args: (array|hash|nil) - arguments for permorm method.
-
#should_enque?(time) ⇒ Boolean
crucial part of whole enquing job.
- #sort_name ⇒ Object
- #status ⇒ Object
- #status_from_redis ⇒ Object
-
#test_and_enque_for_time!(time) ⇒ Object
test if job should be enqued If yes add it to queue.
-
#to_hash ⇒ Object
export job data to hash.
- #valid? ⇒ Boolean
Constructor Details
#initialize(input_args = {}) ⇒ Job
Returns a new instance of Job.
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/sidekiq/cron/job.rb', line 163 def initialize input_args = {} args = input_args.stringify_keys @name = args["name"] @cron = args["cron"] #get class from klass or class @klass = args["klass"] || args["class"] #set status of job @status = args['status'] || status_from_redis #set last run time @last_run_time = Time.parse(args['last_run_time'].to_s) rescue Time.now #get right arguments for job @args = args["args"].nil? ? [] : (args["args"].is_a?(Array) ? args["args"] : [ args["args"] ]) if args["message"] @message = args["message"] elsif @klass = { "class" => @klass.to_s, "args" => @args, } #get right data for message #only if message wasn't specified before = case @klass when Class @klass..merge() when String begin @klass.constantize..merge() rescue #Unknown class .merge("queue"=>"default") end end #override queue if setted in config #only if message is hash - can be string (dumped JSON) ['queue'] = args['queue'] if args['queue'] #dump message as json @message = end end |
Instance Attribute Details
#args ⇒ Object
Returns the value of attribute args.
160 161 162 |
# File 'lib/sidekiq/cron/job.rb', line 160 def args @args end |
#cron ⇒ Object
Returns the value of attribute cron.
160 161 162 |
# File 'lib/sidekiq/cron/job.rb', line 160 def cron @cron end |
#klass ⇒ Object
Returns the value of attribute klass.
160 161 162 |
# File 'lib/sidekiq/cron/job.rb', line 160 def klass @klass end |
#last_run_time ⇒ Object (readonly)
Returns the value of attribute last_run_time.
161 162 163 |
# File 'lib/sidekiq/cron/job.rb', line 161 def last_run_time @last_run_time end |
#message ⇒ Object
Returns the value of attribute message.
160 161 162 |
# File 'lib/sidekiq/cron/job.rb', line 160 def @message end |
#name ⇒ Object
Returns the value of attribute name.
160 161 162 |
# File 'lib/sidekiq/cron/job.rb', line 160 def name @name end |
Class Method Details
.all ⇒ Object
get all cron jobs
108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/sidekiq/cron/job.rb', line 108 def self.all out = [] Sidekiq.redis do |conn| out = conn.smembers(jobs_key).collect do |key| if conn.exists key Job.new conn.hgetall(key) else nil end end end out.select{|j| !j.nil? } end |
.count ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/sidekiq/cron/job.rb', line 122 def self.count out = 0 Sidekiq.redis do |conn| out = conn.scard(jobs_key) end out end |
.create(hash) ⇒ Object
create new instance of cron job
144 145 146 |
# File 'lib/sidekiq/cron/job.rb', line 144 def self.create hash new(hash).save end |
.destroy(name) ⇒ Object
destroy job by name
149 150 151 152 153 154 155 156 157 158 |
# File 'lib/sidekiq/cron/job.rb', line 149 def self.destroy name #if name is hash try to get name from it name = name[:name] || name['name'] if name.is_a?(Hash) if job = find(name) job.destroy else false end end |
.destroy_all! ⇒ Object
remove all job from cron
324 325 326 327 328 329 |
# File 'lib/sidekiq/cron/job.rb', line 324 def self.destroy_all! all.each do |job| job.destroy end logger.info { "Cron Jobs - deleted all jobs" } end |
.exists?(name) ⇒ Boolean
344 345 346 347 348 349 350 |
# File 'lib/sidekiq/cron/job.rb', line 344 def self.exists? name out = false Sidekiq.redis do |conn| out = conn.exists redis_key name end out end |
.find(name) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/sidekiq/cron/job.rb', line 130 def self.find name #if name is hash try to get name from it name = name[:name] || name['name'] if name.is_a?(Hash) output = nil Sidekiq.redis do |conn| if exists? name output = Job.new conn.hgetall( redis_key(name) ) end end output end |
.load_from_array(array) ⇒ Object
load cron jobs from Array input structure should look like: [
{
'name' => 'name_of_job',
'class' => 'MyClass',
'cron' => '1 * * * *',
'args' => '(OPTIONAL) [Array or Hash]'
},
{
'name' => 'Cool Job for Second Class',
'class' => 'SecondClass',
'cron' => '*/5 * * * *'
}
]
97 98 99 100 101 102 103 104 |
# File 'lib/sidekiq/cron/job.rb', line 97 def self.load_from_array array errors = {} array.each do |job_data| job = new(job_data) errors[job.name] = job.errors unless job.save end errors end |
.load_from_hash(hash) ⇒ Object
load cron jobs from Hash input structure should look like: {
'name_of_job' => {
'class' => 'MyClass',
'cron' => '1 * * * *',
'args' => '(OPTIONAL) [Array or Hash]'
},
'My super iber cool job' => {
'class' => 'SecondClass',
'cron' => '*/5 * * * *'
}
}
72 73 74 75 76 77 78 |
# File 'lib/sidekiq/cron/job.rb', line 72 def self.load_from_hash hash array = hash.inject([]) do |out,(key, job)| job['name'] = key out << job end load_from_array array end |
Instance Method Details
#destroy ⇒ Object
remove job from cron jobs by name input:
first arg: name (string) - name of job (must be same - case sensitive)
309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/sidekiq/cron/job.rb', line 309 def destroy Sidekiq.redis do |conn| #delete from set conn.srem self.class.jobs_key, redis_key #delete runned timestamps conn.del job_enqueued_key #delete main job conn.del redis_key end logger.info { "Cron Jobs - deleted job with name: #{@name}" } end |
#disable! ⇒ Object
218 219 220 221 |
# File 'lib/sidekiq/cron/job.rb', line 218 def disable! @status = "disabled" save end |
#enable! ⇒ Object
223 224 225 226 |
# File 'lib/sidekiq/cron/job.rb', line 223 def enable! @status = "enabled" save end |
#enque!(time = Time.now) ⇒ Object
enque cron job to queue
49 50 51 52 53 54 55 56 |
# File 'lib/sidekiq/cron/job.rb', line 49 def enque! time = Time.now @last_run_time = time Sidekiq::Client.push(@message.is_a?(String) ? Sidekiq.load_json(@message) : @message) save logger.debug { "enqueued #{@name}: #{@message}" } end |
#errors ⇒ Object
253 254 255 |
# File 'lib/sidekiq/cron/job.rb', line 253 def errors @errors ||= [] end |
#exists? ⇒ Boolean
352 353 354 |
# File 'lib/sidekiq/cron/job.rb', line 352 def exists? self.class.exists? @name end |
#formated_last_time(now = Time.now) ⇒ Object
340 341 342 |
# File 'lib/sidekiq/cron/job.rb', line 340 def formated_last_time now = Time.now last_time(now).getutc end |
#last_time(now = Time.now) ⇒ Object
Parse cron specification ‘* * * * *’ and returns time when last run should be performed
333 334 335 336 337 338 |
# File 'lib/sidekiq/cron/job.rb', line 333 def last_time now = Time.now # add 1 minute to Time now - Cron parser return last time after minute ends, # so by adding 60 second we will get last time after the right time happens # without any delay! CronParser.new(@cron).last(now + 60) end |
#remove_previous_enques(time) ⇒ Object
remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory
32 33 34 35 36 |
# File 'lib/sidekiq/cron/job.rb', line 32 def remove_previous_enques time Sidekiq.redis do |conn| conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}") end end |
#save ⇒ Object
add job to cron jobs input:
name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform
optional input:
queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/sidekiq/cron/job.rb', line 287 def save #if job is invalid return false return false unless valid? Sidekiq.redis do |conn| #add to set of all jobs conn.sadd self.class.jobs_key, redis_key #add informations for this job! conn.hmset redis_key, *hash_to_redis(to_hash) #add information about last time! - don't enque right after scheduler poller starts! time = Time.now conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) end logger.info { "Cron Jobs - add job with name: #{@name}" } end |
#should_enque?(time) ⇒ Boolean
crucial part of whole enquing job
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/sidekiq/cron/job.rb', line 17 def should_enque? time out = false Sidekiq.redis do |conn| out = ( status == "enabled" && @last_run_time < last_time(time) && conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time) ) ) end out end |
#sort_name ⇒ Object
356 357 358 |
# File 'lib/sidekiq/cron/job.rb', line 356 def sort_name "#{status == "enabled" ? 0 : 1}_#{name}".downcase end |
#status ⇒ Object
214 215 216 |
# File 'lib/sidekiq/cron/job.rb', line 214 def status @status end |
#status_from_redis ⇒ Object
228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/sidekiq/cron/job.rb', line 228 def status_from_redis if exists? out = "enabled" Sidekiq.redis do |conn| out = conn.hget redis_key, "status" end out else "enabled" end end |
#test_and_enque_for_time!(time) ⇒ Object
test if job should be enqued If yes add it to queue
39 40 41 42 43 44 45 46 |
# File 'lib/sidekiq/cron/job.rb', line 39 def test_and_enque_for_time! time #should this job be enqued? if should_enque?(time) enque! remove_previous_enques(time) end end |
#to_hash ⇒ Object
export job data to hash
241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/sidekiq/cron/job.rb', line 241 def to_hash { name: @name, klass: @klass, cron: @cron, args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []), message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}), status: @status, last_run_time: @last_run_time, } end |
#valid? ⇒ Boolean
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/sidekiq/cron/job.rb', line 257 def valid? #clear previos errors @errors = [] errors << "'name' must be set" if @name.nil? || @name.size == 0 if @cron.nil? || @cron.size == 0 errors << "'cron' must be set" else begin cron = CronParser.new(@cron) cron.next(Time.now) rescue Exception => e errors << "'cron' -> #{@cron}: #{e.}" end end errors << "'klass' (or class) must be set" if @klass.nil? || @klass.size == 0 !errors.any? end |