Class: SimpleScheduler::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_scheduler/task.rb

Overview

Class for parsing each task in the scheduler config YAML file and returning the values needed to schedule the task in the future.

Constant Summary collapse

DEFAULT_QUEUE_AHEAD_MINUTES =
360

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ Task

Initializes a task by parsing the params so the task can be queued in the future.

Parameters:

  • params (Hash)

Options Hash (params):

  • :class (String)

    The class of the Active Job or Sidekiq Worker

  • :every (String)

    How frequently the job will be performed

  • :at (String)

    The starting time for the interval

  • :expires_after (String)

    The interval used to determine how late the job is allowed to run

  • :queue_ahead (Integer)

    The number of minutes that jobs should be queued in the future

  • :task_name (String)

    The name of the task as defined in the YAML config

  • :tz (String)

    The time zone to use when parsing the ‘at` option



23
24
25
26
# File 'lib/simple_scheduler/task.rb', line 23

def initialize(params)
  validate_params!(params)
  @params = params
end

Instance Attribute Details

#job_classClass (readonly)

Returns The class of the job or worker.

Returns:

  • (Class)

    The class of the job or worker.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/simple_scheduler/task.rb', line 9

class Task
  attr_reader :job_class, :params

  DEFAULT_QUEUE_AHEAD_MINUTES = 360

  # Initializes a task by parsing the params so the task can be queued in the future.
  # @param params [Hash]
  # @option params [String] :class The class of the Active Job or Sidekiq Worker
  # @option params [String] :every How frequently the job will be performed
  # @option params [String] :at The starting time for the interval
  # @option params [String] :expires_after The interval used to determine how late the job is allowed to run
  # @option params [Integer] :queue_ahead The number of minutes that jobs should be queued in the future
  # @option params [String] :task_name The name of the task as defined in the YAML config
  # @option params [String] :tz The time zone to use when parsing the `at` option
  def initialize(params)
    validate_params!(params)
    @params = params
  end

  # The task's first run time as a Time-like object.
  # @return [SimpleScheduler::At]
  def at
    @at ||= At.new(@params[:at], time_zone)
  end

  # The time between the scheduled and actual run time that should cause the job not to run.
  # @return [String]
  def expires_after
    @params[:expires_after]
  end

  # Returns an array of existing jobs matching the job class of the task.
  # @return [Array<Sidekiq::SortedEntry>]
  def existing_jobs
    @existing_jobs ||= SimpleScheduler::Task.scheduled_set.select do |job|
      next unless job.display_class == "SimpleScheduler::FutureJob"

      task_params = job.display_args[0].symbolize_keys
      task_params[:class] == job_class_name && task_params[:name] == name
    end.to_a
  end

  # Returns an array of existing future run times that have already been scheduled.
  # @return [Array<Time>]
  def existing_run_times
    @existing_run_times ||= existing_jobs.map(&:at)
  end

  # How often the job will be run.
  # @return [ActiveSupport::Duration]
  def frequency
    @frequency ||= parse_frequency(@params[:every])
  end

  # Returns an array Time objects for future run times based on
  # the current time and the given minutes to look ahead.
  # @return [Array<Time>]
  # rubocop:disable Metrics/AbcSize
  def future_run_times
    future_run_times = existing_run_times.dup
    last_run_time = future_run_times.last || at - frequency
    last_run_time = last_run_time.in_time_zone(time_zone)

    # Ensure there are at least two future jobs scheduled and that the queue ahead time is filled
    while future_run_times.length < 2 || minutes_queued_ahead(last_run_time) < queue_ahead
      last_run_time = frequency.from_now(last_run_time)
      # The hour may not match because of a shift caused by DST in previous run times,
      # so we need to ensure that the hour matches the specified hour if given.
      last_run_time = last_run_time.change(hour: at.hour, min: at.min) if at.hour?
      future_run_times << last_run_time
    end

    future_run_times
  end
  # rubocop:enable Metrics/AbcSize

  # The class name of the job or worker.
  # @return [String]
  def job_class_name
    @params[:class]
  end

  # The name of the task as defined in the YAML config.
  # @return [String]
  def name
    @params[:name]
  end

  # The number of minutes that jobs should be queued in the future.
  # @return [Integer]
  def queue_ahead
    @queue_ahead ||= @params[:queue_ahead] || DEFAULT_QUEUE_AHEAD_MINUTES
  end

  # The time zone to use when parsing the `at` option.
  # @return [ActiveSupport::TimeZone]
  def time_zone
    @time_zone ||= params[:tz] ? ActiveSupport::TimeZone.new(params[:tz]) : Time.zone
  end

  # Loads the scheduled jobs from Sidekiq once to avoid loading from
  # Redis for each task when looking up existing scheduled jobs.
  # @return [Sidekiq::ScheduledSet]
  def self.scheduled_set
    @scheduled_set ||= Sidekiq::ScheduledSet.new
  end

  private

  def minutes_queued_ahead(last_run_time)
    (last_run_time - Time.now) / 60
  end

  def parse_frequency(every_string)
    split_duration = every_string.split(".")
    frequency = split_duration[0].to_i
    frequency_units = split_duration[1]
    frequency.send(frequency_units)
  end

  def validate_params!(params)
    raise ArgumentError, "Missing param `class` specifying the class of the job to run." unless params.key?(:class)
    raise ArgumentError, "Missing param `every` specifying how often the job should run." unless params.key?(:every)

    @job_class = params[:class].constantize
    params[:name] ||= params[:class]
  end
end

#paramsHash (readonly)

Returns The params used to create the task.

Returns:

  • (Hash)

    The params used to create the task



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/simple_scheduler/task.rb', line 9

class Task
  attr_reader :job_class, :params

  DEFAULT_QUEUE_AHEAD_MINUTES = 360

  # Initializes a task by parsing the params so the task can be queued in the future.
  # @param params [Hash]
  # @option params [String] :class The class of the Active Job or Sidekiq Worker
  # @option params [String] :every How frequently the job will be performed
  # @option params [String] :at The starting time for the interval
  # @option params [String] :expires_after The interval used to determine how late the job is allowed to run
  # @option params [Integer] :queue_ahead The number of minutes that jobs should be queued in the future
  # @option params [String] :task_name The name of the task as defined in the YAML config
  # @option params [String] :tz The time zone to use when parsing the `at` option
  def initialize(params)
    validate_params!(params)
    @params = params
  end

  # The task's first run time as a Time-like object.
  # @return [SimpleScheduler::At]
  def at
    @at ||= At.new(@params[:at], time_zone)
  end

  # The time between the scheduled and actual run time that should cause the job not to run.
  # @return [String]
  def expires_after
    @params[:expires_after]
  end

  # Returns an array of existing jobs matching the job class of the task.
  # @return [Array<Sidekiq::SortedEntry>]
  def existing_jobs
    @existing_jobs ||= SimpleScheduler::Task.scheduled_set.select do |job|
      next unless job.display_class == "SimpleScheduler::FutureJob"

      task_params = job.display_args[0].symbolize_keys
      task_params[:class] == job_class_name && task_params[:name] == name
    end.to_a
  end

  # Returns an array of existing future run times that have already been scheduled.
  # @return [Array<Time>]
  def existing_run_times
    @existing_run_times ||= existing_jobs.map(&:at)
  end

  # How often the job will be run.
  # @return [ActiveSupport::Duration]
  def frequency
    @frequency ||= parse_frequency(@params[:every])
  end

  # Returns an array Time objects for future run times based on
  # the current time and the given minutes to look ahead.
  # @return [Array<Time>]
  # rubocop:disable Metrics/AbcSize
  def future_run_times
    future_run_times = existing_run_times.dup
    last_run_time = future_run_times.last || at - frequency
    last_run_time = last_run_time.in_time_zone(time_zone)

    # Ensure there are at least two future jobs scheduled and that the queue ahead time is filled
    while future_run_times.length < 2 || minutes_queued_ahead(last_run_time) < queue_ahead
      last_run_time = frequency.from_now(last_run_time)
      # The hour may not match because of a shift caused by DST in previous run times,
      # so we need to ensure that the hour matches the specified hour if given.
      last_run_time = last_run_time.change(hour: at.hour, min: at.min) if at.hour?
      future_run_times << last_run_time
    end

    future_run_times
  end
  # rubocop:enable Metrics/AbcSize

  # The class name of the job or worker.
  # @return [String]
  def job_class_name
    @params[:class]
  end

  # The name of the task as defined in the YAML config.
  # @return [String]
  def name
    @params[:name]
  end

  # The number of minutes that jobs should be queued in the future.
  # @return [Integer]
  def queue_ahead
    @queue_ahead ||= @params[:queue_ahead] || DEFAULT_QUEUE_AHEAD_MINUTES
  end

  # The time zone to use when parsing the `at` option.
  # @return [ActiveSupport::TimeZone]
  def time_zone
    @time_zone ||= params[:tz] ? ActiveSupport::TimeZone.new(params[:tz]) : Time.zone
  end

  # Loads the scheduled jobs from Sidekiq once to avoid loading from
  # Redis for each task when looking up existing scheduled jobs.
  # @return [Sidekiq::ScheduledSet]
  def self.scheduled_set
    @scheduled_set ||= Sidekiq::ScheduledSet.new
  end

  private

  def minutes_queued_ahead(last_run_time)
    (last_run_time - Time.now) / 60
  end

  def parse_frequency(every_string)
    split_duration = every_string.split(".")
    frequency = split_duration[0].to_i
    frequency_units = split_duration[1]
    frequency.send(frequency_units)
  end

  def validate_params!(params)
    raise ArgumentError, "Missing param `class` specifying the class of the job to run." unless params.key?(:class)
    raise ArgumentError, "Missing param `every` specifying how often the job should run." unless params.key?(:every)

    @job_class = params[:class].constantize
    params[:name] ||= params[:class]
  end
end

Class Method Details

.scheduled_setSidekiq::ScheduledSet

Loads the scheduled jobs from Sidekiq once to avoid loading from Redis for each task when looking up existing scheduled jobs.

Returns:

  • (Sidekiq::ScheduledSet)


112
113
114
# File 'lib/simple_scheduler/task.rb', line 112

def self.scheduled_set
  @scheduled_set ||= Sidekiq::ScheduledSet.new
end

Instance Method Details

#atSimpleScheduler::At

The task’s first run time as a Time-like object.

Returns:



30
31
32
# File 'lib/simple_scheduler/task.rb', line 30

def at
  @at ||= At.new(@params[:at], time_zone)
end

#existing_jobsArray<Sidekiq::SortedEntry>

Returns an array of existing jobs matching the job class of the task.

Returns:

  • (Array<Sidekiq::SortedEntry>)


42
43
44
45
46
47
48
49
# File 'lib/simple_scheduler/task.rb', line 42

def existing_jobs
  @existing_jobs ||= SimpleScheduler::Task.scheduled_set.select do |job|
    next unless job.display_class == "SimpleScheduler::FutureJob"

    task_params = job.display_args[0].symbolize_keys
    task_params[:class] == job_class_name && task_params[:name] == name
  end.to_a
end

#existing_run_timesArray<Time>

Returns an array of existing future run times that have already been scheduled.

Returns:

  • (Array<Time>)


53
54
55
# File 'lib/simple_scheduler/task.rb', line 53

def existing_run_times
  @existing_run_times ||= existing_jobs.map(&:at)
end

#expires_afterString

The time between the scheduled and actual run time that should cause the job not to run.

Returns:

  • (String)


36
37
38
# File 'lib/simple_scheduler/task.rb', line 36

def expires_after
  @params[:expires_after]
end

#frequencyActiveSupport::Duration

How often the job will be run.

Returns:

  • (ActiveSupport::Duration)


59
60
61
# File 'lib/simple_scheduler/task.rb', line 59

def frequency
  @frequency ||= parse_frequency(@params[:every])
end

#future_run_timesArray<Time>

Returns an array Time objects for future run times based on the current time and the given minutes to look ahead. rubocop:disable Metrics/AbcSize

Returns:

  • (Array<Time>)


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/simple_scheduler/task.rb', line 67

def future_run_times
  future_run_times = existing_run_times.dup
  last_run_time = future_run_times.last || at - frequency
  last_run_time = last_run_time.in_time_zone(time_zone)

  # Ensure there are at least two future jobs scheduled and that the queue ahead time is filled
  while future_run_times.length < 2 || minutes_queued_ahead(last_run_time) < queue_ahead
    last_run_time = frequency.from_now(last_run_time)
    # The hour may not match because of a shift caused by DST in previous run times,
    # so we need to ensure that the hour matches the specified hour if given.
    last_run_time = last_run_time.change(hour: at.hour, min: at.min) if at.hour?
    future_run_times << last_run_time
  end

  future_run_times
end

#job_class_nameString

The class name of the job or worker.

Returns:

  • (String)


87
88
89
# File 'lib/simple_scheduler/task.rb', line 87

def job_class_name
  @params[:class]
end

#nameString

The name of the task as defined in the YAML config.

Returns:

  • (String)


93
94
95
# File 'lib/simple_scheduler/task.rb', line 93

def name
  @params[:name]
end

#queue_aheadInteger

The number of minutes that jobs should be queued in the future.

Returns:

  • (Integer)


99
100
101
# File 'lib/simple_scheduler/task.rb', line 99

def queue_ahead
  @queue_ahead ||= @params[:queue_ahead] || DEFAULT_QUEUE_AHEAD_MINUTES
end

#time_zoneActiveSupport::TimeZone

The time zone to use when parsing the ‘at` option.

Returns:

  • (ActiveSupport::TimeZone)


105
106
107
# File 'lib/simple_scheduler/task.rb', line 105

def time_zone
  @time_zone ||= params[:tz] ? ActiveSupport::TimeZone.new(params[:tz]) : Time.zone
end