Class: RabbitJobs::Scheduler
- Inherits:
-
Object
- Object
- RabbitJobs::Scheduler
- Defined in:
- lib/rabbit_jobs/scheduler.rb
Instance Attribute Summary collapse
-
#background ⇒ Object
Returns the value of attribute background.
-
#pidfile ⇒ Object
Returns the value of attribute pidfile.
-
#process_name ⇒ Object
Returns the value of attribute process_name.
-
#schedule ⇒ Object
Returns the value of attribute schedule.
Instance Method Summary collapse
-
#clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one.
- #load_default_schedule ⇒ Object
-
#load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance.
-
#publish_from_config(config) ⇒ Object
Publish a job based on a config hash.
-
#rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current ENV.
- #rufus_scheduler ⇒ Object
- #shutdown ⇒ Object
- #shutdown! ⇒ Object
- #startup ⇒ Object
-
#work(time = 0) ⇒ Object
Subscribes to channel and working on jobs.
Instance Attribute Details
#background ⇒ Object
Returns the value of attribute background.
10 11 12 |
# File 'lib/rabbit_jobs/scheduler.rb', line 10 def background @background end |
#pidfile ⇒ Object
Returns the value of attribute pidfile.
10 11 12 |
# File 'lib/rabbit_jobs/scheduler.rb', line 10 def pidfile @pidfile end |
#process_name ⇒ Object
Returns the value of attribute process_name.
10 11 12 |
# File 'lib/rabbit_jobs/scheduler.rb', line 10 def process_name @process_name end |
#schedule ⇒ Object
Returns the value of attribute schedule.
10 11 12 |
# File 'lib/rabbit_jobs/scheduler.rb', line 10 def schedule @schedule end |
Instance Method Details
#clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
79 80 81 82 83 |
# File 'lib/rabbit_jobs/scheduler.rb', line 79 def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil rufus_scheduler end |
#load_default_schedule ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/rabbit_jobs/scheduler.rb', line 12 def load_default_schedule if defined?(Rails) file = Rails.root.join('config/schedule.yml') if file.file? @schedule = YAML.load_file(file) end end end |
#load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/rabbit_jobs/scheduler.rb', line 23 def load_schedule! @schedule ||= load_default_schedule raise "You should setup a schedule or place it in config/schedule.yml" unless schedule schedule.each do |name, config| # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as # required for the jobs to be scheduled. If rails_env is missing, the # job should be scheduled regardless of what ENV['RAILS_ENV'] is set # to. if config['rails_env'].nil? || rails_env_matches?(config) interval_defined = false interval_types = %w{cron every} interval_types.each do |interval_type| if !config[interval_type].nil? && config[interval_type].length > 0 RJ.logger.info "queueing #{config['class']} (#{name})" rufus_scheduler.send(interval_type, config[interval_type]) do publish_from_config(config) end interval_defined = true break end end unless interval_defined RJ.logger.warn "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" end end end end |
#publish_from_config(config) ⇒ Object
Publish a job based on a config hash
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/rabbit_jobs/scheduler.rb', line 59 def publish_from_config(config) args = config['args'] || config[:args] || [] klass_name = config['class'] || config[:class] params = args.is_a?(Hash) ? [args] : Array(args) queue = config['queue'] || config[:queue] || RJ.config.routing_keys.first RJ.logger.info "publishing #{config} at #{Time.now}" RJ.publish_to(queue, klass_name, *params) rescue RJ.logger.warn "Failed to publish #{klass_name}:\n #{$!}\n params = #{params.inspect}" RJ.logger.warn $!.inspect end |
#rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current ENV
54 55 56 |
# File 'lib/rabbit_jobs/scheduler.rb', line 54 def rails_env_matches?(config) config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV']) end |
#rufus_scheduler ⇒ Object
72 73 74 75 |
# File 'lib/rabbit_jobs/scheduler.rb', line 72 def rufus_scheduler raise "Cannot start without eventmachine running." unless EM.reactor_running? @rufus_scheduler ||= Rufus::Scheduler.start_new end |
#shutdown ⇒ Object
131 132 133 134 |
# File 'lib/rabbit_jobs/scheduler.rb', line 131 def shutdown RJ.logger.info "Stopping..." @shutdown = true end |
#shutdown! ⇒ Object
166 167 168 |
# File 'lib/rabbit_jobs/scheduler.rb', line 166 def shutdown! shutdown end |
#startup ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/rabbit_jobs/scheduler.rb', line 136 def startup # prune_dead_workers RabbitJobs::Util.check_pidfile(self.pidfile) if self.pidfile if self.background child_pid = fork if child_pid return false else # daemonize child process Process.daemon(true) end end # Fix buffering so we can `rake rj:work > resque.log` and # get output from the child in there. $stdout.sync = true if self.pidfile File.open(self.pidfile, 'w') { |f| f << Process.pid } end @shutdown = false Signal.trap('TERM') { shutdown } Signal.trap('INT') { shutdown! } true end |
#work(time = 0) ⇒ Object
Subscribes to channel and working on jobs
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/rabbit_jobs/scheduler.rb', line 86 def work(time = 0) begin return false unless startup $0 = self.process_name || "rj_scheduler" processed_count = 0 RJ.run do load_schedule! check_shutdown = Proc.new { if @shutdown RJ.stop RJ.logger.info "Stopped." File.delete(self.pidfile) if self.pidfile end } if time > 0 EM.add_timer(time) do self.shutdown end end EM.add_periodic_timer(1) do check_shutdown.call end RJ.logger.info "Started." end rescue => e error = $! if RJ.logger begin RJ.logger.error [error., error.backtrace].flatten.join("\n") ensure abort(error.) end end end true end |