Class: Rufus::Scheduler
- Inherits:
-
Object
- Object
- Rufus::Scheduler
- Defined in:
- lib/rufus/scheduler.rb,
lib/rufus/scheduler/jobs.rb,
lib/rufus/scheduler/util.rb,
lib/rufus/scheduler/locks.rb,
lib/rufus/scheduler/cronline.rb,
lib/rufus/scheduler/job_array.rb
Defined Under Namespace
Classes: AtJob, CronJob, CronLine, D, Error, EvInJob, EveryJob, FileLock, InJob, IntervalJob, Job, JobArray, NotRunningError, NullLock, OneTimeJob, RepeatJob, TimeoutError
Constant Summary collapse
- VERSION =
'3.4.0'
- EoTime =
::EtOrbi::EoTime
- MAX_WORK_THREADS =
MIN_WORK_THREADS = 3
28
- DURATIONS2M =
[ [ 'y', 365 * 24 * 3600 ], [ 'M', 30 * 24 * 3600 ], [ 'w', 7 * 24 * 3600 ], [ 'd', 24 * 3600 ], [ 'h', 3600 ], [ 'm', 60 ], [ 's', 1 ] ]
- DURATIONS2 =
DURATIONS2M.dup
- DURATIONS =
DURATIONS2M.inject({}) { |r, (k, v)| r[k] = v; r }
- DURATION_LETTERS =
DURATIONS.keys.join
- DU_KEYS =
DURATIONS2M.collect { |k, v| k.to_sym }
Instance Attribute Summary collapse
-
#frequency ⇒ Object
Returns the value of attribute frequency.
-
#max_work_threads ⇒ Object
attr_accessor :min_work_threads.
-
#mutexes ⇒ Object
readonly
Returns the value of attribute mutexes.
-
#started_at ⇒ Object
readonly
Returns the value of attribute started_at.
-
#stderr ⇒ Object
Returns the value of attribute stderr.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
-
#thread_key ⇒ Object
readonly
Returns the value of attribute thread_key.
-
#work_queue ⇒ Object
readonly
Returns the value of attribute work_queue.
Class Method Summary collapse
-
.h_to_s(t = Time.now) ⇒ Object
Produces a hour/min/sec/milli string representation of Time instance.
-
.parse(o, opts = {}) ⇒ Object
– time and string methods ++.
- .parse_at(o, opts = {}) ⇒ Object
- .parse_cron(o, opts) ⇒ Object
-
.parse_duration(string, opts = {}) ⇒ Object
(also: parse_duration_string, parse_time_string)
Turns a string like ‘1m10s’ into a float like ‘70.0’, more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
- .parse_in(o, opts = {}) ⇒ Object
-
.s(opts = {}) ⇒ Object
Alias for Rufus::Scheduler.singleton.
-
.singleton(opts = {}) ⇒ Object
Returns a singleton Rufus::Scheduler instance.
-
.start_new ⇒ Object
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
-
.to_duration(seconds, options = {}) ⇒ Object
(also: to_duration_string, to_time_string)
Turns a number of seconds into a a time string.
-
.to_duration_hash(seconds, options = {}) ⇒ Object
Turns a number of seconds (integer or Float) into a hash like in :.
-
.utc_to_s(t = Time.now) ⇒ Object
Produces the UTC string representation of a Time instance.
Instance Method Summary collapse
-
#at(time, callable = nil, opts = {}, &block) ⇒ Object
– scheduling methods ++.
- #at_jobs(opts = {}) ⇒ Object
-
#confirm_lock ⇒ Object
Callback called when a job is triggered.
- #cron(cronline, callable = nil, opts = {}, &block) ⇒ Object
- #cron_jobs(opts = {}) ⇒ Object
- #down? ⇒ Boolean
- #every(duration, callable = nil, opts = {}, &block) ⇒ Object
- #every_jobs(opts = {}) ⇒ Object
- #in(duration, callable = nil, opts = {}, &block) ⇒ Object
- #in_jobs(opts = {}) ⇒ Object
-
#initialize(opts = {}) ⇒ Scheduler
constructor
A new instance of Scheduler.
- #interval(duration, callable = nil, opts = {}, &block) ⇒ Object
- #interval_jobs(opts = {}) ⇒ Object
- #job(job_id) ⇒ Object
-
#jobs(opts = {}) ⇒ Object
Returns all the scheduled jobs (even those right before re-schedule).
- #join ⇒ Object
-
#lock ⇒ Object
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
- #occurrences(time0, time1, format = :per_job) ⇒ Object
- #on_error(job, err) ⇒ Object
- #pause ⇒ Object
- #paused? ⇒ Boolean
- #repeat(arg, callable = nil, opts = {}, &block) ⇒ Object
- #resume ⇒ Object
- #running_jobs(opts = {}) ⇒ Object
- #schedule(arg, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_at(time, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_cron(cronline, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_every(duration, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_in(duration, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_interval(duration, callable = nil, opts = {}, &block) ⇒ Object
-
#scheduled?(job_or_job_id) ⇒ Boolean
Returns true if this job is currently scheduled.
- #shutdown(opt = nil) ⇒ Object (also: #stop)
-
#threads ⇒ Object
Lists all the threads associated with this scheduler.
- #timeline(time0, time1) ⇒ Object
-
#unlock ⇒ Object
Sister method to #lock, is called when the scheduler shuts down.
- #unschedule(job_or_job_id) ⇒ Object
- #up? ⇒ Boolean
- #uptime ⇒ Object
- #uptime_s ⇒ Object
-
#work_threads(query = :all) ⇒ Object
Lists all the work threads (the ones actually running the scheduled block code).
Constructor Details
#initialize(opts = {}) ⇒ Scheduler
Returns a new instance of Scheduler.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/rufus/scheduler.rb', line 56 def initialize(opts={}) @opts = opts @started_at = nil @paused = false @jobs = JobArray.new @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) @mutexes = {} @work_queue = Queue.new #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS @stderr = $stderr @thread_key = "rufus_scheduler_#{self.object_id}" @scheduler_lock = if lockfile = opts[:lockfile] Rufus::Scheduler::FileLock.new(lockfile) else opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new end @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new # If we can't grab the @scheduler_lock, don't run. lock || return start end |
Instance Attribute Details
#frequency ⇒ Object
Returns the value of attribute frequency.
43 44 45 |
# File 'lib/rufus/scheduler.rb', line 43 def frequency @frequency end |
#max_work_threads ⇒ Object
attr_accessor :min_work_threads
50 51 52 |
# File 'lib/rufus/scheduler.rb', line 50 def max_work_threads @max_work_threads end |
#mutexes ⇒ Object (readonly)
Returns the value of attribute mutexes.
47 48 49 |
# File 'lib/rufus/scheduler.rb', line 47 def mutexes @mutexes end |
#started_at ⇒ Object (readonly)
Returns the value of attribute started_at.
44 45 46 |
# File 'lib/rufus/scheduler.rb', line 44 def started_at @started_at end |
#stderr ⇒ Object
Returns the value of attribute stderr.
52 53 54 |
# File 'lib/rufus/scheduler.rb', line 52 def stderr @stderr end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
45 46 47 |
# File 'lib/rufus/scheduler.rb', line 45 def thread @thread end |
#thread_key ⇒ Object (readonly)
Returns the value of attribute thread_key.
46 47 48 |
# File 'lib/rufus/scheduler.rb', line 46 def thread_key @thread_key end |
#work_queue ⇒ Object (readonly)
Returns the value of attribute work_queue.
54 55 56 |
# File 'lib/rufus/scheduler.rb', line 54 def work_queue @work_queue end |
Class Method Details
.h_to_s(t = Time.now) ⇒ Object
Produces a hour/min/sec/milli string representation of Time instance
252 253 254 255 |
# File 'lib/rufus/scheduler/util.rb', line 252 def self.h_to_s(t=Time.now) "#{t.strftime('%H:%M:%S')}.#{sprintf('%06d', t.usec)}" end |
.parse(o, opts = {}) ⇒ Object
– time and string methods ++
10 11 12 13 14 15 16 17 18 |
# File 'lib/rufus/scheduler/util.rb', line 10 def self.parse(o, opts={}) opts[:no_error] = true parse_cron(o, opts) || parse_in(o, opts) || # covers 'every' schedule strings parse_at(o, opts) || fail(ArgumentError.new("couldn't parse #{o.inspect} (#{o.class})")) end |
.parse_at(o, opts = {}) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rufus/scheduler/util.rb', line 45 def self.parse_at(o, opts={}) return o if o.is_a?(EoTime) return EoTime.make(o) if o.is_a?(Time) EoTime.parse(o, opts) rescue StandardError => se return nil if opts[:no_error] fail se end |
.parse_cron(o, opts) ⇒ Object
20 21 22 23 24 25 26 27 28 |
# File 'lib/rufus/scheduler/util.rb', line 20 def self.parse_cron(o, opts) CronLine.new(o) rescue ArgumentError => ae return nil if opts[:no_error] fail ae end |
.parse_duration(string, opts = {}) ⇒ Object Also known as: parse_duration_string, parse_time_string
Turns a string like ‘1m10s’ into a float like ‘70.0’, more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year ‘nada’ -> millisecond
Some examples:
Rufus::Scheduler.parse_duration "0.5" # => 0.5
Rufus::Scheduler.parse_duration "500" # => 0.5
Rufus::Scheduler.parse_duration "1000" # => 1.0
Rufus::Scheduler.parse_duration "1h" # => 3600.0
Rufus::Scheduler.parse_duration "1h10s" # => 3610.0
Rufus::Scheduler.parse_duration "1w2d" # => 777600.0
Negative time strings are OK (Thanks Danny Fullerton):
Rufus::Scheduler.parse_duration "-0.5" # => -0.5
Rufus::Scheduler.parse_duration "-1h" # => -3600.0
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/rufus/scheduler/util.rb', line 101 def self.parse_duration(string, opts={}) s = string.to_s.strip mod = s[0, 1] == '-' ? -1 : 1 s = s[1..-1] if mod == -1 ss = mod < 0 ? '-' : '' r = 0.0 s.scan(/(\d*\.\d+|\d+\.?)([#{DURATION_LETTERS}]?)/) do |f, d| ss += "#{f}#{d}" r += f.to_f * (DURATIONS[d] || 1.0) end if ss == '-' || ss != string.to_s.strip return nil if opts[:no_error] fail ArgumentError.new("invalid time duration #{string.inspect}") end mod * r end |
.parse_in(o, opts = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/rufus/scheduler/util.rb', line 30 def self.parse_in(o, opts={}) #o.is_a?(String) ? parse_duration(o, opts) : o return parse_duration(o, opts) if o.is_a?(String) return o if o.is_a?(Numeric) fail ArgumentError.new("couldn't parse time point in #{o.inspect}") rescue ArgumentError => ae return nil if opts[:no_error] fail ae end |
.s(opts = {}) ⇒ Object
Alias for Rufus::Scheduler.singleton
101 |
# File 'lib/rufus/scheduler.rb', line 101 def self.s(opts={}); singleton(opts); end |
.singleton(opts = {}) ⇒ Object
Returns a singleton Rufus::Scheduler instance
94 95 96 97 |
# File 'lib/rufus/scheduler.rb', line 94 def self.singleton(opts={}) @singleton ||= Rufus::Scheduler.new(opts) end |
.start_new ⇒ Object
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
For now, let’s assume the people pointing at rufus-scheduler/master on GitHub know what they do…
109 110 111 112 |
# File 'lib/rufus/scheduler.rb', line 109 def self.start_new fail "this is rufus-scheduler 3.x, use .new instead of .start_new" end |
.to_duration(seconds, options = {}) ⇒ Object Also known as: to_duration_string, to_time_string
Turns a number of seconds into a a time string
Rufus.to_duration 0 # => '0s'
Rufus.to_duration 60 # => '1m'
Rufus.to_duration 3661 # => '1h1m1s'
Rufus.to_duration 7 * 24 * 3600 # => '1w'
Rufus.to_duration 30 * 24 * 3600 + 1 # => "4w2d1s"
It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.
For 30 days months to be counted, the second parameter of this method can be set to true.
Rufus.to_duration 30 * 24 * 3600 + 1, true # => "1M1s"
If a Float value is passed, milliseconds will be displayed without ‘marker’
Rufus.to_duration 0.051 # => "51"
Rufus.to_duration 7.051 # => "7s51"
Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1 # => "4w2d1s120"
(this behaviour mirrors the one found for parse_time_string()).
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/rufus/scheduler/util.rb', line 164 def self.to_duration(seconds, ={}) h = to_duration_hash(seconds, ) return ([:drop_seconds] ? '0m' : '0s') if h.empty? s = DU_KEYS.inject('') { |r, key| count = h[key] count = nil if count == 0 r << "#{count}#{key}" if count r } ms = h[:ms] s << ms.to_s if ms s end |
.to_duration_hash(seconds, options = {}) ⇒ Object
Turns a number of seconds (integer or Float) into a hash like in :
Rufus.to_duration_hash 0.051
# => { :ms => "51" }
Rufus.to_duration_hash 7.051
# => { :s => 7, :ms => "51" }
Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1
# => { :w => 4, :d => 2, :s => 1, :ms => "120" }
This method is used by to_duration behind the scenes.
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/rufus/scheduler/util.rb', line 210 def self.to_duration_hash(seconds, ={}) h = {} if seconds.is_a?(Float) h[:ms] = (seconds % 1 * 1000).to_i seconds = seconds.to_i end if [:drop_seconds] h.delete(:ms) seconds = (seconds - seconds % 60) end durations = [:months] ? DURATIONS2M : DURATIONS2 durations.each do |key, duration| count = seconds / duration seconds = seconds % duration h[key.to_sym] = count if count > 0 end h end |
.utc_to_s(t = Time.now) ⇒ Object
Produces the UTC string representation of a Time instance
like “2009/11/23 11:11:50.947109 UTC”
245 246 247 248 |
# File 'lib/rufus/scheduler/util.rb', line 245 def self.utc_to_s(t=Time.now) "#{t.utc.strftime('%Y-%m-%d %H:%M:%S')}.#{sprintf('%06d', t.usec)} UTC" end |
Instance Method Details
#at(time, callable = nil, opts = {}, &block) ⇒ Object
– scheduling methods ++
183 184 185 186 |
# File 'lib/rufus/scheduler.rb', line 183 def at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, opts[:job], block) end |
#at_jobs(opts = {}) ⇒ Object
294 295 296 297 |
# File 'lib/rufus/scheduler.rb', line 294 def at_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } end |
#confirm_lock ⇒ Object
Callback called when a job is triggered. If the lock cannot be acquired, the job won’t run (though it’ll still be scheduled to run again if necessary).
362 363 364 365 |
# File 'lib/rufus/scheduler.rb', line 362 def confirm_lock @trigger_lock.lock end |
#cron(cronline, callable = nil, opts = {}, &block) ⇒ Object
223 224 225 226 |
# File 'lib/rufus/scheduler.rb', line 223 def cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, opts[:job], block) end |
#cron_jobs(opts = {}) ⇒ Object
314 315 316 317 |
# File 'lib/rufus/scheduler.rb', line 314 def cron_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } end |
#down? ⇒ Boolean
154 155 156 157 |
# File 'lib/rufus/scheduler.rb', line 154 def down? ! @started_at end |
#every(duration, callable = nil, opts = {}, &block) ⇒ Object
203 204 205 206 |
# File 'lib/rufus/scheduler.rb', line 203 def every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, opts[:job], block) end |
#every_jobs(opts = {}) ⇒ Object
304 305 306 307 |
# File 'lib/rufus/scheduler.rb', line 304 def every_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } end |
#in(duration, callable = nil, opts = {}, &block) ⇒ Object
193 194 195 196 |
# File 'lib/rufus/scheduler.rb', line 193 def in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, opts[:job], block) end |
#in_jobs(opts = {}) ⇒ Object
299 300 301 302 |
# File 'lib/rufus/scheduler.rb', line 299 def in_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } end |
#interval(duration, callable = nil, opts = {}, &block) ⇒ Object
213 214 215 216 |
# File 'lib/rufus/scheduler.rb', line 213 def interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, opts[:job], block) end |
#interval_jobs(opts = {}) ⇒ Object
309 310 311 312 |
# File 'lib/rufus/scheduler.rb', line 309 def interval_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } end |
#job(job_id) ⇒ Object
319 320 321 322 |
# File 'lib/rufus/scheduler.rb', line 319 def job(job_id) @jobs[job_id] end |
#jobs(opts = {}) ⇒ Object
Returns all the scheduled jobs (even those right before re-schedule).
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/rufus/scheduler.rb', line 276 def jobs(opts={}) opts = { opts => true } if opts.is_a?(Symbol) jobs = @jobs.to_a if opts[:running] jobs = jobs.select { |j| j.running? } elsif ! opts[:all] jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end = Array(opts[:tag] || opts[:tags]).collect(&:to_s) jobs = jobs.reject { |j| .find { |t| ! j..include?(t) } } jobs end |
#join ⇒ Object
145 146 147 148 149 150 151 152 |
# File 'lib/rufus/scheduler.rb', line 145 def join fail NotRunningError.new( 'cannot join scheduler that is not running' ) unless @thread @thread.join end |
#lock ⇒ Object
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.
Out of the box, rufus-scheduler proposes the :lockfile => ‘path/to/lock/file’ scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (the first to write the lockfile and lock it). It uses “man 2 flock” so it probably won’t work reliably on distributed file systems.
If one needs to use a special/different locking mechanism, the scheduler accepts :scheduler_lock => lock_object. lock_object only needs to respond to #lock and #unlock, and both of these methods should be idempotent.
Look at rufus/scheduler/locks.rb for an example.
345 346 347 348 |
# File 'lib/rufus/scheduler.rb', line 345 def lock @scheduler_lock.lock end |
#occurrences(time0, time1, format = :per_job) ⇒ Object
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
# File 'lib/rufus/scheduler.rb', line 418 def occurrences(time0, time1, format=:per_job) h = {} jobs.each do |j| os = j.occurrences(time0, time1) h[j] = os if os.any? end if format == :timeline a = [] h.each { |j, ts| ts.each { |t| a << [ t, j ] } } a.sort_by { |(t, _)| t } else h end end |
#on_error(job, err) ⇒ Object
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 |
# File 'lib/rufus/scheduler.rb', line 441 def on_error(job, err) pre = err.object_id.to_s ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") stderr.puts(" #{pre} job:") stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") # TODO: eventually use a Job#detail or something like that stderr.puts(" #{pre} error:") stderr.puts(" #{pre} #{err.object_id}") stderr.puts(" #{pre} #{err.class}") stderr.puts(" #{pre} #{err}") err.backtrace.each do |l| stderr.puts(" #{pre} #{l}") end stderr.puts(" #{pre} tz:") stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") stderr.puts(" #{pre} Time.now: #{Time.now}") stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}") stderr.puts(" #{pre} et-orbi:") stderr.puts(" #{pre} #{EoTime.platform_info}") stderr.puts(" #{pre} scheduler:") stderr.puts(" #{pre} object_id: #{object_id}") stderr.puts(" #{pre} opts:") stderr.puts(" #{pre} #{@opts.inspect}") stderr.puts(" #{pre} frequency: #{self.frequency}") stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") stderr.puts(" #{pre} down?: #{down?}") stderr.puts(" #{pre} threads: #{self.threads.size}") stderr.puts(" #{pre} thread: #{self.thread}") stderr.puts(" #{pre} thread_key: #{self.thread_key}") stderr.puts(" #{pre} work_threads: #{work_threads.size}") stderr.puts(" #{pre} active: #{work_threads(:active).size}") stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") stderr.puts(" #{pre} mutexes: #{ms.inspect}") stderr.puts(" #{pre} jobs: #{jobs.size}") stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") stderr.puts(" #{pre} work_queue: #{work_queue.size}") stderr.puts("} #{pre} .") rescue => e stderr.puts("failure in #on_error itself:") stderr.puts(e.inspect) stderr.puts(e.backtrace) ensure stderr.flush end |
#pause ⇒ Object
169 170 171 172 |
# File 'lib/rufus/scheduler.rb', line 169 def pause @paused = true end |
#paused? ⇒ Boolean
164 165 166 167 |
# File 'lib/rufus/scheduler.rb', line 164 def paused? @paused end |
#repeat(arg, callable = nil, opts = {}, &block) ⇒ Object
247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/rufus/scheduler.rb', line 247 def repeat(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when CronLine then schedule_cron(arg, callable, opts, &block) else schedule_every(arg, callable, opts, &block) end end |
#resume ⇒ Object
174 175 176 177 |
# File 'lib/rufus/scheduler.rb', line 174 def resume @paused = false end |
#running_jobs(opts = {}) ⇒ Object
413 414 415 416 |
# File 'lib/rufus/scheduler.rb', line 413 def running_jobs(opts={}) jobs(opts.merge(:running => true)) end |
#schedule(arg, callable = nil, opts = {}, &block) ⇒ Object
233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/rufus/scheduler.rb', line 233 def schedule(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when CronLine then schedule_cron(arg, callable, opts, &block) when Time then schedule_at(arg, callable, opts, &block) else schedule_in(arg, callable, opts, &block) end end |
#schedule_at(time, callable = nil, opts = {}, &block) ⇒ Object
188 189 190 191 |
# File 'lib/rufus/scheduler.rb', line 188 def schedule_at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, true, block) end |
#schedule_cron(cronline, callable = nil, opts = {}, &block) ⇒ Object
228 229 230 231 |
# File 'lib/rufus/scheduler.rb', line 228 def schedule_cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, true, block) end |
#schedule_every(duration, callable = nil, opts = {}, &block) ⇒ Object
208 209 210 211 |
# File 'lib/rufus/scheduler.rb', line 208 def schedule_every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, true, block) end |
#schedule_in(duration, callable = nil, opts = {}, &block) ⇒ Object
198 199 200 201 |
# File 'lib/rufus/scheduler.rb', line 198 def schedule_in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, true, block) end |
#schedule_interval(duration, callable = nil, opts = {}, &block) ⇒ Object
218 219 220 221 |
# File 'lib/rufus/scheduler.rb', line 218 def schedule_interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, true, block) end |
#scheduled?(job_or_job_id) ⇒ Boolean
Returns true if this job is currently scheduled.
Takes extra care to answer true if the job is a repeat job currently firing.
372 373 374 375 376 377 |
# File 'lib/rufus/scheduler.rb', line 372 def scheduled?(job_or_job_id) job, _ = fetch(job_or_job_id) !! (job && job.unscheduled_at.nil? && job.next_time != nil) end |
#shutdown(opt = nil) ⇒ Object Also known as: stop
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/rufus/scheduler.rb', line 114 def shutdown(opt=nil) @started_at = nil #jobs.each { |j| j.unschedule } # provokes https://github.com/jmettraux/rufus-scheduler/issue/98 @jobs.array.each { |j| j.unschedule } @work_queue.clear if opt == :wait join_all_work_threads elsif opt == :kill kill_all_work_threads end unlock end |
#threads ⇒ Object
Lists all the threads associated with this scheduler.
381 382 383 384 |
# File 'lib/rufus/scheduler.rb', line 381 def threads Thread.list.select { |t| t[thread_key] } end |
#timeline(time0, time1) ⇒ Object
436 437 438 439 |
# File 'lib/rufus/scheduler.rb', line 436 def timeline(time0, time1) occurrences(time0, time1, :timeline) end |
#unlock ⇒ Object
Sister method to #lock, is called when the scheduler shuts down.
352 353 354 355 356 |
# File 'lib/rufus/scheduler.rb', line 352 def unlock @trigger_lock.unlock @scheduler_lock.unlock end |
#unschedule(job_or_job_id) ⇒ Object
260 261 262 263 264 265 266 267 |
# File 'lib/rufus/scheduler.rb', line 260 def unschedule(job_or_job_id) job, job_id = fetch(job_or_job_id) fail ArgumentError.new("no job found with id '#{job_id}'") unless job job.unschedule if job end |
#up? ⇒ Boolean
159 160 161 162 |
# File 'lib/rufus/scheduler.rb', line 159 def up? !! @started_at end |
#uptime ⇒ Object
135 136 137 138 |
# File 'lib/rufus/scheduler.rb', line 135 def uptime @started_at ? EoTime.now - @started_at : nil end |
#uptime_s ⇒ Object
140 141 142 143 |
# File 'lib/rufus/scheduler.rb', line 140 def uptime_s uptime ? self.class.to_duration(uptime) : '' end |
#work_threads(query = :all) ⇒ Object
Lists all the work threads (the ones actually running the scheduled block code)
Accepts a query option, which can be set to:
-
:all (default), returns all the threads that are work threads or are currently running a job
-
:active, returns all threads that are currently running a job
-
:vacant, returns the threads that are not running a job
If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.
399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/rufus/scheduler.rb', line 399 def work_threads(query=:all) ts = threads.select { |t| t[:rufus_scheduler_job] || t[:rufus_scheduler_work_thread] } case query when :active then ts.select { |t| t[:rufus_scheduler_job] } when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } else ts end end |