Class: Resque::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
SchedulerLocking
Defined in:
lib/resque/scheduler.rb,
lib/resque/scheduler/lock/base.rb,
lib/resque/scheduler/lock/basic.rb,
lib/resque/scheduler/lock/resilient.rb

Defined Under Namespace

Modules: Lock

Class Attribute Summary collapse

Class Method Summary collapse

Methods included from SchedulerLocking

is_master?, master_lock, release_master_lock!, supports_lua?

Class Attribute Details

.dynamicObject

If set, will try to update the schedule in the loop



23
24
25
# File 'lib/resque/scheduler.rb', line 23

def dynamic
  @dynamic
end

.logfileObject

If set, will write messages to the file



20
21
22
# File 'lib/resque/scheduler.rb', line 20

def logfile
  @logfile
end

.loggerObject



40
41
42
# File 'lib/resque/scheduler.rb', line 40

def logger
  @logger ||= ResqueScheduler::LoggerBuilder.new(:mute => mute, :verbose => verbose, :log_dev => logfile).build
end

.muteObject

If set, produces no output



17
18
19
# File 'lib/resque/scheduler.rb', line 17

def mute
  @mute
end

.poll_sleep_amountObject



36
37
38
# File 'lib/resque/scheduler.rb', line 36

def poll_sleep_amount
  @poll_sleep_amount ||= 5 # seconds
end

.verboseObject

If true, logs more stuff…



14
15
16
# File 'lib/resque/scheduler.rb', line 14

def verbose
  @verbose
end

Class Method Details

.clear_schedule!Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler



266
267
268
269
270
271
# File 'lib/resque/scheduler.rb', line 266

def clear_schedule!
  rufus_scheduler.stop
  @rufus_scheduler = nil
  @@scheduled_jobs = {}
  rufus_scheduler
end

.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object

Enqueues all delayed jobs for a timestamp



190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/resque/scheduler.rb', line 190

def enqueue_delayed_items_for_timestamp(timestamp)
  item = nil
  begin
    handle_shutdown do
      # Continually check that it is still the master
      if is_master? && item = Resque.next_item_for_timestamp(timestamp)
        log "queuing #{item['class']} [delayed]"
        handle_errors { enqueue_from_config(item) }
      end
    end
  # continue processing until there are no more ready items in this timestamp
  end while !item.nil?
end

.enqueue_from_config(job_config) ⇒ Object

Enqueues a job based on a config hash



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/resque/scheduler.rb', line 219

def enqueue_from_config(job_config)
  args = job_config['args'] || job_config[:args]

  klass_name = job_config['class'] || job_config[:class]
  klass = ResqueScheduler::Util.constantize(klass_name) rescue klass_name

  params = args.is_a?(Hash) ? [args] : Array(args)
  queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass)
  # Support custom job classes like those that inherit from Resque::JobWithStatus (resque-status)
  if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job')
    # The custom job class API must offer a static "scheduled" method. If the custom
    # job class can not be constantized (via a requeue call from the web perhaps), fall
    # back to enqueing normally via Resque::Job.create.
    begin
      ResqueScheduler::Util.constantize(job_klass).scheduled(queue, klass_name, *params)
    rescue NameError
      # Note that the custom job class (job_config['custom_job_class']) is the one enqueued
      Resque::Job.create(queue, job_klass, *params)
    end
  else
    # hack to avoid havoc for people shoving stuff into queues
    # for non-existent classes (for example: running scheduler in
    # one app that schedules for another
    if Class === klass
      ResqueScheduler::Plugin.run_before_delayed_enqueue_hooks(klass, *params)

      # If the class is a custom job class, call self#scheduled on it. This allows you to do things like
      # Resque.enqueue_at(timestamp, CustomJobClass). Otherwise, pass off to Resque.
      if klass.respond_to?(:scheduled)
        klass.scheduled(queue, klass_name, *params)
      else
        Resque.enqueue_to(queue, klass, *params)
      end
    else
      # This will not run the before_hooks in rescue, but will at least
      # queue the job.
      Resque::Job.create(queue, klass, *params)
    end
  end
end

.handle_delayed_items(at_time = nil) ⇒ Object

Handles queueing delayed items at_time - Time to start scheduling items (default: now).



179
180
181
182
183
184
185
186
187
# File 'lib/resque/scheduler.rb', line 179

def handle_delayed_items(at_time=nil)
  if timestamp = Resque.next_delayed_timestamp(at_time)
    procline "Processing Delayed Items"
    while !timestamp.nil?
      enqueue_delayed_items_for_timestamp(timestamp)
      timestamp = Resque.next_delayed_timestamp(at_time)
    end
  end
end

.handle_errorsObject



210
211
212
213
214
215
216
# File 'lib/resque/scheduler.rb', line 210

def handle_errors
  begin
    yield
  rescue Exception => e
    log! "#{e.class.name}: #{e.message}"
  end
end

.handle_shutdownObject



204
205
206
207
208
# File 'lib/resque/scheduler.rb', line 204

def handle_shutdown
  exit if @shutdown
  yield
  exit if @shutdown
end

.load_schedule!Object

Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/resque/scheduler.rb', line 110

def load_schedule!
  procline "Loading Schedule"

  # Need to load the schedule from redis for the first time if dynamic
  Resque.reload_schedule! if dynamic

  log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?

  @@scheduled_jobs = {}

  Resque.schedule.each do |name, config|
    load_schedule_job(name, config)
  end
  Resque.redis.del(:schedules_changed)
  procline "Schedules Loaded"
end

.load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/resque/scheduler.rb', line 143

def load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || rails_env_matches?(config)
    log! "Scheduling #{name} "
    interval_defined = false
    interval_types = %w{cron every}
    interval_types.each do |interval_type|
      if !config[interval_type].nil? && config[interval_type].length > 0
        args = optionizate_interval_value(config[interval_type])
        @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, *args) do
          if is_master?
            log! "queueing #{config['class']} (#{name})"
            handle_errors { enqueue_from_config(config) }
          end
        end
        interval_defined = true
        break
      end
    end
    unless interval_defined
      log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
    end
  end
end

.log(msg) ⇒ Object



329
330
331
# File 'lib/resque/scheduler.rb', line 329

def log(msg)
  logger.debug msg
end

.log!(msg) ⇒ Object



325
326
327
# File 'lib/resque/scheduler.rb', line 325

def log!(msg)
  logger.info msg
end

.optionizate_interval_value(value) ⇒ Object

modify interval type value to value with options if options available



128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/resque/scheduler.rb', line 128

def optionizate_interval_value(value)
  args = value
  if args.is_a?(::Array)
    return args.first if args.size > 2 || !args.last.is_a?(::Hash)
    # symbolize keys of hash for options
    args[1] = args[1].inject({}) do |m, i|
      key, value = i
      m[(key.to_sym rescue key) || key] = value
      m
    end
  end
  args
end

.poll_sleepObject

Sleeps and returns true



304
305
306
307
308
309
# File 'lib/resque/scheduler.rb', line 304

def poll_sleep
  @sleeping = true
  handle_shutdown { sleep poll_sleep_amount }
  @sleeping = false
  true
end


98
99
100
101
102
103
104
105
106
# File 'lib/resque/scheduler.rb', line 98

def print_schedule
  if rufus_scheduler
    log! "Scheduling Info\tLast Run"
    scheduler_jobs = rufus_scheduler.all_jobs
    scheduler_jobs.each do |k, v|
      log! "#{v.t}\t#{v.last}\t"
    end
  end
end

.procline(string) ⇒ Object



333
334
335
336
# File 'lib/resque/scheduler.rb', line 333

def procline(string)
  log! string
  $0 = "resque-scheduler-#{ResqueScheduler::VERSION}: #{string}"
end

.rails_env_matches?(config) ⇒ Boolean

Returns true if the given schedule config hash matches the current ENV

Returns:

  • (Boolean)


173
174
175
# File 'lib/resque/scheduler.rb', line 173

def rails_env_matches?(config)
  config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV'])
end

.register_signal_handlersObject

For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant). In the case of sleeping, exit immediately.



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/resque/scheduler.rb', line 85

def register_signal_handlers
  trap("TERM") { shutdown }
  trap("INT") { shutdown }

  begin
    trap('QUIT') { shutdown }
    trap('USR1') { print_schedule }
    trap('USR2') { reload_schedule! }
  rescue ArgumentError
    warn "Signals QUIT and USR1 and USR2 not supported."
  end
end

.reload_schedule!Object



273
274
275
276
277
# File 'lib/resque/scheduler.rb', line 273

def reload_schedule!
  procline "Reloading Schedule"
  clear_schedule!
  load_schedule!
end

.rufus_schedulerObject



260
261
262
# File 'lib/resque/scheduler.rb', line 260

def rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.start_new
end

.runObject

Schedule all jobs and continually look for delayed jobs (never returns)



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/resque/scheduler.rb', line 45

def run
  $0 = "resque-scheduler: Starting"

  # trap signals
  register_signal_handlers

  # Quote from the resque/worker.
  # Fix buffering so we can `rake resque:scheduler > scheduler.log` and
  # get output from the child in there.
  $stdout.sync = true
  $stderr.sync = true

  # Load the schedule into rufus
  # If dynamic is set, load that schedule otherwise use normal load
  if dynamic
    reload_schedule!
  else
    load_schedule!
  end

  # Now start the scheduling part of the loop.
  loop do
    if is_master?
      begin
        handle_delayed_items
        update_schedule if dynamic
      rescue Errno::EAGAIN, Errno::ECONNRESET => e
        warn e.message
      end
    end
    poll_sleep
  end

  # never gets here.
end

.scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



32
33
34
# File 'lib/resque/scheduler.rb', line 32

def scheduled_jobs
  @@scheduled_jobs
end

.shutdownObject

Sets the shutdown flag, clean schedules and exits if sleeping



312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/resque/scheduler.rb', line 312

def shutdown
  @shutdown = true

  if @sleeping
    thread = Thread.new do
      Resque.clean_schedules
      release_master_lock!
    end
    thread.join
    exit
  end
end

.unschedule_job(name) ⇒ Object



295
296
297
298
299
300
301
# File 'lib/resque/scheduler.rb', line 295

def unschedule_job(name)
  if scheduled_jobs[name]
    log "Removing schedule #{name}"
    scheduled_jobs[name].unschedule
    @@scheduled_jobs.delete(name)
  end
end

.update_scheduleObject



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/resque/scheduler.rb', line 279

def update_schedule
  if Resque.redis.scard(:schedules_changed) > 0
    procline "Updating schedule"
    Resque.reload_schedule!
    while schedule_name = Resque.redis.spop(:schedules_changed)
      if Resque.schedule.keys.include?(schedule_name)
        unschedule_job(schedule_name)
        load_schedule_job(schedule_name, Resque.schedule[schedule_name])
      else
        unschedule_job(schedule_name)
      end
    end
    procline "Schedules Loaded"
  end
end