Class: SidekiqScheduler::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Sidekiq::Util
Defined in:
lib/sidekiq-scheduler/scheduler.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Scheduler

Returns a new instance of Scheduler.



45
46
47
48
49
50
# File 'lib/sidekiq-scheduler/scheduler.rb', line 45

def initialize(options = {})
  self.enabled = options[:enabled]
  self.dynamic = options[:dynamic]
  self.dynamic_every = options[:dynamic_every]
  self.listened_queues_only = options[:listened_queues_only]
end

Instance Attribute Details

#dynamicObject

Set to update the schedule in runtime in a given time period.



21
22
23
# File 'lib/sidekiq-scheduler/scheduler.rb', line 21

def dynamic
  @dynamic
end

#dynamic_everyObject

Set to update the schedule in runtime dynamically per this period.



24
25
26
# File 'lib/sidekiq-scheduler/scheduler.rb', line 24

def dynamic_every
  @dynamic_every
end

#enabledObject

Set to enable or disable the scheduler.



18
19
20
# File 'lib/sidekiq-scheduler/scheduler.rb', line 18

def enabled
  @enabled
end

#listened_queues_onlyObject

Set to schedule jobs only when will be pushed to queues listened by sidekiq



27
28
29
# File 'lib/sidekiq-scheduler/scheduler.rb', line 27

def listened_queues_only
  @listened_queues_only
end

Class Method Details

.instanceObject



31
32
33
34
# File 'lib/sidekiq-scheduler/scheduler.rb', line 31

def instance
  @instance = new unless @instance
  @instance
end

.instance=(value) ⇒ Object



36
37
38
# File 'lib/sidekiq-scheduler/scheduler.rb', line 36

def instance=(value)
  @instance = value
end

.method_missing(method, *arguments, &block) ⇒ Object



40
41
42
# File 'lib/sidekiq-scheduler/scheduler.rb', line 40

def method_missing(method, *arguments, &block)
  instance_methods.include?(method) ? instance.public_send(method, *arguments) : super
end

Instance Method Details

#clear_schedule!(stop_option = :wait) ⇒ Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler

Parameters:

  • stop_option (Symbol) (defaults to: :wait)

    The option to be passed to Rufus::Scheduler#stop



187
188
189
190
191
192
193
194
195
196
# File 'lib/sidekiq-scheduler/scheduler.rb', line 187

def clear_schedule!(stop_option = :wait)
  if @rufus_scheduler
    @rufus_scheduler.stop(stop_option)
    @rufus_scheduler = nil
  end

  @@scheduled_jobs = {}

  rufus_scheduler
end

#enqueue_job(job_config, time = Time.now) ⇒ Object

Enqueue a job based on a config hash

Parameters:

  • job_config (Hash)

    the job configuration

  • time (Time) (defaults to: Time.now)

    time the job is enqueued



157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/sidekiq-scheduler/scheduler.rb', line 157

def enqueue_job(job_config, time = Time.now)
  config = prepare_arguments(job_config.dup)

  if config.delete('include_metadata')
    config['args'] = (config['args'], scheduled_at: time.to_f)
  end

  if active_job_enqueue?(config['class'])
    SidekiqScheduler::Utils.enqueue_with_active_job(config)
  else
    SidekiqScheduler::Utils.enqueue_with_sidekiq(config)
  end
end

#idempotent_job_enqueue(job_name, time, config) ⇒ Object

Pushes the job into Sidekiq if not already pushed for the given time

Parameters:

  • job_name (String)

    The job’s name

  • time (Time)

    The time when the job got cleared for triggering

  • config (Hash)

    Job’s config hash



139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/sidekiq-scheduler/scheduler.rb', line 139

def idempotent_job_enqueue(job_name, time, config)
  registered = SidekiqScheduler::RedisManager.register_job_instance(job_name, time)

  if registered
    Sidekiq.logger.info "queueing #{config['class']} (#{job_name})"

    handle_errors { enqueue_job(config, time) }

    SidekiqScheduler::RedisManager.remove_elder_job_instances(job_name)
  else
    Sidekiq.logger.debug { "Ignoring #{job_name} job as it has been already enqueued" }
  end
end

#job_enabled?(name) ⇒ Boolean

Returns:

  • (Boolean)


228
229
230
231
# File 'lib/sidekiq-scheduler/scheduler.rb', line 228

def job_enabled?(name)
  job = Sidekiq.schedule[name]
  schedule_state(name).fetch('enabled', job.fetch('enabled', true)) if job
end

#load_schedule!Object

Pulls the schedule from Sidekiq.schedule and loads it into the rufus scheduler instance



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/sidekiq-scheduler/scheduler.rb', line 69

def load_schedule!
  if enabled
    Sidekiq.logger.info 'Loading Schedule'

    # Load schedule from redis for the first time if dynamic
    if dynamic
      Sidekiq.reload_schedule!
      @current_changed_score = Time.now.to_f
      rufus_scheduler.every(dynamic_every) do
        update_schedule
      end
    end

    Sidekiq.logger.info 'Schedule empty! Set Sidekiq.schedule' if Sidekiq.schedule.empty?

    @scheduled_jobs = {}
    queues = sidekiq_queues

    Sidekiq.schedule.each do |name, config|
      if !listened_queues_only || enabled_queue?(config['queue'].to_s, queues)
        load_schedule_job(name, config)
      else
        Sidekiq.logger.info { "Ignoring #{name}, job's queue is not enabled." }
      end
    end

    Sidekiq.logger.info 'Schedules Loaded'
  else
    Sidekiq.logger.info 'SidekiqScheduler is disabled'
  end
end

#load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/sidekiq-scheduler/scheduler.rb', line 102

def load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || rails_env_matches?(config)
    Sidekiq.logger.info "Scheduling #{name} #{config}"
    interval_defined = false
    interval_types = %w(cron every at in interval)
    interval_types.each do |interval_type|
      config_interval_type = config[interval_type]

      if !config_interval_type.nil? && config_interval_type.length > 0

        schedule, options = SidekiqScheduler::RufusUtils.normalize_schedule_options(config_interval_type)

        rufus_job = new_job(name, interval_type, config, schedule, options)
        @scheduled_jobs[name] = rufus_job
        SidekiqScheduler::Utils.update_job_next_time(name, rufus_job.next_time)

        interval_defined = true

        break
      end
    end

    unless interval_defined
      Sidekiq.logger.info "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
    end
  end
end


57
58
59
60
61
62
63
64
65
# File 'lib/sidekiq-scheduler/scheduler.rb', line 57

def print_schedule
  if rufus_scheduler
    Sidekiq.logger.info "Scheduling Info\tLast Run"
    scheduler_jobs = rufus_scheduler.all_jobs
    scheduler_jobs.each_value do |v|
      Sidekiq.logger.info "#{v.t}\t#{v.last}\t"
    end
  end
end

#reload_schedule!Object



198
199
200
201
202
203
204
205
206
# File 'lib/sidekiq-scheduler/scheduler.rb', line 198

def reload_schedule!
  if enabled
    Sidekiq.logger.info 'Reloading Schedule'
    clear_schedule!
    load_schedule!
  else
    Sidekiq.logger.info 'SidekiqScheduler is disabled'
  end
end

#rufus_schedulerObject



179
180
181
# File 'lib/sidekiq-scheduler/scheduler.rb', line 179

def rufus_scheduler
  @rufus_scheduler ||= SidekiqScheduler::Utils.new_rufus_scheduler(rufus_scheduler_options)
end

#rufus_scheduler_optionsObject



171
172
173
# File 'lib/sidekiq-scheduler/scheduler.rb', line 171

def rufus_scheduler_options
  @rufus_scheduler_options ||= {}
end

#rufus_scheduler_options=(options) ⇒ Object



175
176
177
# File 'lib/sidekiq-scheduler/scheduler.rb', line 175

def rufus_scheduler_options=(options)
  @rufus_scheduler_options = options
end

#scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



53
54
55
# File 'lib/sidekiq-scheduler/scheduler.rb', line 53

def scheduled_jobs
  @scheduled_jobs
end

#toggle_job_enabled(name) ⇒ Object



233
234
235
236
237
# File 'lib/sidekiq-scheduler/scheduler.rb', line 233

def toggle_job_enabled(name)
  state = schedule_state(name)
  state['enabled'] = !job_enabled?(name)
  set_schedule_state(name, state)
end

#update_scheduleObject



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/sidekiq-scheduler/scheduler.rb', line 208

def update_schedule
  last_changed_score, @current_changed_score = @current_changed_score, Time.now.to_f
  schedule_changes = SidekiqScheduler::RedisManager.get_schedule_changes(last_changed_score, @current_changed_score)

  if schedule_changes.size > 0
    Sidekiq.logger.info 'Updating schedule'

    Sidekiq.reload_schedule!
    schedule_changes.each do |schedule_name|
      if Sidekiq.schedule.keys.include?(schedule_name)
        unschedule_job(schedule_name)
        load_schedule_job(schedule_name, Sidekiq.schedule[schedule_name])
      else
        unschedule_job(schedule_name)
      end
    end
    Sidekiq.logger.info 'Schedule updated'
  end
end