Class: SidekiqScheduler::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Sidekiq::Util
Defined in:
lib/sidekiq-scheduler/scheduler.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Scheduler

Returns a new instance of Scheduler.



46
47
48
49
50
51
# File 'lib/sidekiq-scheduler/scheduler.rb', line 46

def initialize(options = {})
  self.enabled = options[:enabled]
  self.dynamic = options[:dynamic]
  self.dynamic_every = options[:dynamic_every]
  self.listened_queues_only = options[:listened_queues_only]
end

Instance Attribute Details

#dynamicObject

Set to update the schedule in runtime in a given time period.



22
23
24
# File 'lib/sidekiq-scheduler/scheduler.rb', line 22

def dynamic
  @dynamic
end

#dynamic_everyObject

Set to update the schedule in runtime dynamically per this period.



25
26
27
# File 'lib/sidekiq-scheduler/scheduler.rb', line 25

def dynamic_every
  @dynamic_every
end

#enabledObject

Set to enable or disable the scheduler.



19
20
21
# File 'lib/sidekiq-scheduler/scheduler.rb', line 19

def enabled
  @enabled
end

#listened_queues_onlyObject

Set to schedule jobs only when will be pushed to queues listened by sidekiq



28
29
30
# File 'lib/sidekiq-scheduler/scheduler.rb', line 28

def listened_queues_only
  @listened_queues_only
end

Class Method Details

.instanceObject



32
33
34
35
# File 'lib/sidekiq-scheduler/scheduler.rb', line 32

def instance
  @instance = new unless @instance
  @instance
end

.instance=(value) ⇒ Object



37
38
39
# File 'lib/sidekiq-scheduler/scheduler.rb', line 37

def instance=(value)
  @instance = value
end

.method_missing(method, *arguments, &block) ⇒ Object



41
42
43
# File 'lib/sidekiq-scheduler/scheduler.rb', line 41

def method_missing(method, *arguments, &block)
  instance_methods.include?(method) ? instance.public_send(method, *arguments) : super
end

Instance Method Details

#clear_schedule!(stop_option = :wait) ⇒ Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler

Parameters:

  • stop_option (Symbol) (defaults to: :wait)

    The option to be passed to Rufus::Scheduler#stop



188
189
190
191
192
193
194
195
196
197
# File 'lib/sidekiq-scheduler/scheduler.rb', line 188

def clear_schedule!(stop_option = :wait)
  if @rufus_scheduler
    @rufus_scheduler.stop(stop_option)
    @rufus_scheduler = nil
  end

  @@scheduled_jobs = {}

  rufus_scheduler
end

#enqueue_job(job_config, time = Time.now) ⇒ Object

Enqueue a job based on a config hash

Parameters:

  • job_config (Hash)

    the job configuration

  • time (Time) (defaults to: Time.now)

    time the job is enqueued



158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/sidekiq-scheduler/scheduler.rb', line 158

def enqueue_job(job_config, time = Time.now)
  config = prepare_arguments(job_config.dup)

  if config.delete('include_metadata')
    config['args'] = (config['args'], scheduled_at: time.to_f)
  end

  if active_job_enqueue?(config['class'])
    SidekiqScheduler::Utils.enqueue_with_active_job(config)
  else
    SidekiqScheduler::Utils.enqueue_with_sidekiq(config)
  end
end

#idempotent_job_enqueue(job_name, time, config) ⇒ Object

Pushes the job into Sidekiq if not already pushed for the given time

Parameters:

  • job_name (String)

    The job’s name

  • time (Time)

    The time when the job got cleared for triggering

  • config (Hash)

    Job’s config hash



140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/sidekiq-scheduler/scheduler.rb', line 140

def idempotent_job_enqueue(job_name, time, config)
  registered = SidekiqScheduler::RedisManager.register_job_instance(job_name, time)

  if registered
    Sidekiq.logger.info "queueing #{config['class']} (#{job_name})"

    handle_errors { enqueue_job(config, time) }

    SidekiqScheduler::RedisManager.remove_elder_job_instances(job_name)
  else
    Sidekiq.logger.debug { "Ignoring #{job_name} job as it has been already enqueued" }
  end
end

#job_enabled?(name) ⇒ Boolean

Returns:

  • (Boolean)


229
230
231
232
# File 'lib/sidekiq-scheduler/scheduler.rb', line 229

def job_enabled?(name)
  job = Sidekiq.schedule[name]
  schedule_state(name).fetch('enabled', job.fetch('enabled', true)) if job
end

#load_schedule!Object

Pulls the schedule from Sidekiq.schedule and loads it into the rufus scheduler instance



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/sidekiq-scheduler/scheduler.rb', line 70

def load_schedule!
  if enabled
    Sidekiq.logger.info 'Loading Schedule'

    # Load schedule from redis for the first time if dynamic
    if dynamic
      Sidekiq.reload_schedule!
      @current_changed_score = Time.now.to_f
      rufus_scheduler.every(dynamic_every) do
        update_schedule
      end
    end

    Sidekiq.logger.info 'Schedule empty! Set Sidekiq.schedule' if Sidekiq.schedule.empty?

    @scheduled_jobs = {}
    queues = sidekiq_queues

    Sidekiq.schedule.each do |name, config|
      if !listened_queues_only || enabled_queue?(config['queue'].to_s, queues)
        load_schedule_job(name, config)
      else
        Sidekiq.logger.info { "Ignoring #{name}, job's queue is not enabled." }
      end
    end

    Sidekiq.logger.info 'Schedules Loaded'
  else
    Sidekiq.logger.info 'SidekiqScheduler is disabled'
  end
end

#load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/sidekiq-scheduler/scheduler.rb', line 103

def load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || rails_env_matches?(config)
    Sidekiq.logger.info "Scheduling #{name} #{config}"
    interval_defined = false
    interval_types = %w(cron every at in interval)
    interval_types.each do |interval_type|
      config_interval_type = config[interval_type]

      if !config_interval_type.nil? && config_interval_type.length > 0

        schedule, options = SidekiqScheduler::RufusUtils.normalize_schedule_options(config_interval_type)

        rufus_job = new_job(name, interval_type, config, schedule, options)
        @scheduled_jobs[name] = rufus_job
        SidekiqScheduler::Utils.update_job_next_time(name, rufus_job.next_time)

        interval_defined = true

        break
      end
    end

    unless interval_defined
      Sidekiq.logger.info "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
    end
  end
end


58
59
60
61
62
63
64
65
66
# File 'lib/sidekiq-scheduler/scheduler.rb', line 58

def print_schedule
  if rufus_scheduler
    Sidekiq.logger.info "Scheduling Info\tLast Run"
    scheduler_jobs = rufus_scheduler.all_jobs
    scheduler_jobs.each_value do |v|
      Sidekiq.logger.info "#{v.t}\t#{v.last}\t"
    end
  end
end

#reload_schedule!Object



199
200
201
202
203
204
205
206
207
# File 'lib/sidekiq-scheduler/scheduler.rb', line 199

def reload_schedule!
  if enabled
    Sidekiq.logger.info 'Reloading Schedule'
    clear_schedule!
    load_schedule!
  else
    Sidekiq.logger.info 'SidekiqScheduler is disabled'
  end
end

#rufus_schedulerObject



180
181
182
# File 'lib/sidekiq-scheduler/scheduler.rb', line 180

def rufus_scheduler
  @rufus_scheduler ||= SidekiqScheduler::Utils.new_rufus_scheduler(rufus_scheduler_options)
end

#rufus_scheduler_optionsObject



172
173
174
# File 'lib/sidekiq-scheduler/scheduler.rb', line 172

def rufus_scheduler_options
  @rufus_scheduler_options ||= {}
end

#rufus_scheduler_options=(options) ⇒ Object



176
177
178
# File 'lib/sidekiq-scheduler/scheduler.rb', line 176

def rufus_scheduler_options=(options)
  @rufus_scheduler_options = options
end

#scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



54
55
56
# File 'lib/sidekiq-scheduler/scheduler.rb', line 54

def scheduled_jobs
  @scheduled_jobs
end

#toggle_job_enabled(name) ⇒ Object



234
235
236
237
238
# File 'lib/sidekiq-scheduler/scheduler.rb', line 234

def toggle_job_enabled(name)
  state = schedule_state(name)
  state['enabled'] = !job_enabled?(name)
  set_schedule_state(name, state)
end

#update_scheduleObject



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/sidekiq-scheduler/scheduler.rb', line 209

def update_schedule
  last_changed_score, @current_changed_score = @current_changed_score, Time.now.to_f
  schedule_changes = SidekiqScheduler::RedisManager.get_schedule_changes(last_changed_score, @current_changed_score)

  if schedule_changes.size > 0
    Sidekiq.logger.info 'Updating schedule'

    Sidekiq.reload_schedule!
    schedule_changes.each do |schedule_name|
      if Sidekiq.schedule.keys.include?(schedule_name)
        unschedule_job(schedule_name)
        load_schedule_job(schedule_name, Sidekiq.schedule[schedule_name])
      else
        unschedule_job(schedule_name)
      end
    end
    Sidekiq.logger.info 'Schedule updated'
  end
end