Class: Resque::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Defined in:
lib/resque/scheduler.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.muteObject

If set, produces no output



24
25
26
# File 'lib/resque/scheduler.rb', line 24

def mute
  @mute
end

.verboseObject

If true, logs more stuff…



21
22
23
# File 'lib/resque/scheduler.rb', line 21

def verbose
  @verbose
end

Class Method Details

.clear_schedule!Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler



120
121
122
123
124
# File 'lib/resque/scheduler.rb', line 120

def clear_schedule!
  rufus_scheduler.stop
  @rufus_scheduler = nil
  rufus_scheduler
end

.enqueue_from_config(config) ⇒ Object

Enqueues a job based on a config hash



99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/resque/scheduler.rb', line 99

def enqueue_from_config(config)
  args = config['args'] || config[:args]
  klass_name = config['class'] || config[:class]
  params = args.nil? ? [] : Array(args)
  queue = config['queue'] || config[:queue] || Resque.queue_from_class(constantize(klass_name))
  if (config[:just_once] || config['just_once'])
    Resque::Job.destroy(queue, klass_name, *params)
  end
  if klass_name.include?("WithStatus")
    Resque::JobWithStatus.enqueue_with_queue(queue, klass_name, *params)
  else
    Resque::Job.create(queue, klass_name, *params)
  end
end

.handle_delayed_itemsObject

Handles queueing delayed items



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/resque/scheduler.rb', line 72

def handle_delayed_items
  item = nil
  begin
    if timestamp = Resque.next_delayed_timestamp
      item = nil
      begin
        handle_shutdown do
          if item = Resque.next_item_for_timestamp(timestamp)
            log "queuing #{item['class']} [delayed]"
            queue = item['queue'] || Resque.queue_from_class(constantize(item['class']))
            Job.create(queue, item['class'], *item['args'])
          end
        end
      # continue processing until there are no more ready items in this timestamp
      end while !item.nil?
    end
  # continue processing until there are no more ready timestamps
  end while !timestamp.nil?
end

.handle_shutdownObject



92
93
94
95
96
# File 'lib/resque/scheduler.rb', line 92

def handle_shutdown
  exit if @shutdown
  yield
  exit if @shutdown
end

.load_schedule!Object

Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/resque/scheduler.rb', line 55

def load_schedule!
  log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?

  Resque.schedule.each do |name, config|
    log! "Scheduling #{name} "
    if !config['cron'].nil? && config['cron'].length > 0
      rufus_scheduler.cron config['cron'] do
        log! "queuing #{config['class']} (#{name})"
        enqueue_from_config(config)
      end
    else
      log! "no cron found for #{config['class']} (#{name}) - skipping"
    end
  end
end

.log(msg) ⇒ Object



144
145
146
147
# File 'lib/resque/scheduler.rb', line 144

def log(msg)
  # add "verbose" logic later
  log!(msg) if verbose
end

.log!(msg) ⇒ Object



140
141
142
# File 'lib/resque/scheduler.rb', line 140

def log!(msg)
  puts "#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{msg}" unless mute
end

.poll_sleepObject

Sleeps and returns true



127
128
129
130
131
132
# File 'lib/resque/scheduler.rb', line 127

def poll_sleep
  @sleeping = true
  handle_shutdown { sleep 5 }
  @sleeping = false
  true
end

.register_signal_handlersObject

For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant). In the case of sleeping, exit immediately.



47
48
49
50
51
# File 'lib/resque/scheduler.rb', line 47

def register_signal_handlers
  trap("TERM") { shutdown }
  trap("INT") { shutdown }
  trap('QUIT') { shutdown } unless defined? JRUBY_VERSION
end

.rufus_schedulerObject



114
115
116
# File 'lib/resque/scheduler.rb', line 114

def rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.start_new
end

.runObject

Schedule all jobs and continually look for delayed jobs (never returns)



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/resque/scheduler.rb', line 27

def run

  # trap signals
  register_signal_handlers

  # Load the schedule into rufus
  load_schedule!

  # Now start the scheduling part of the loop.
  loop do
    handle_delayed_items
    poll_sleep
  end

  # never gets here.
end

.shutdownObject

Sets the shutdown flag, exits if sleeping



135
136
137
138
# File 'lib/resque/scheduler.rb', line 135

def shutdown
  @shutdown = true
  exit if @sleeping
end