Class: Cron::Server

Inherits:
Object
  • Object
show all
Includes:
StandardModel
Defined in:
lib/app/jobs/cron/server.rb

Overview

Handle the coordination of which server should be running the cron jobs

Constant Summary collapse

STATE_PRIMARY =

Constants

'primary'.freeze
STATE_SECONDARY =
'secondary'.freeze
ALL_STATES =
[STATE_PRIMARY, STATE_SECONDARY].freeze

Class Method Summary collapse

Instance Method Summary collapse

Methods included from StandardModel

#clear_cache, included, #remove_blank_secure_fields, #secure_fields, #update, #update!

Methods included from App47Logger

#clean_params, #delete_parameter_keys, #log_controller_error, #log_debug, log_debug, log_error, #log_error, log_exception, #log_message, log_message, #log_warn, log_warn, #mask_parameter_keys, #update_flash_messages

Class Method Details

.find_or_create_serverObject

Find a record for this server



76
77
78
# File 'lib/app/jobs/cron/server.rb', line 76

def self.find_or_create_server
  Cron::Server.find_or_create_by!(host_name: Socket.gethostname, pid: Process.pid)
end

.primary_serverObject

Find a the current master



83
84
85
# File 'lib/app/jobs/cron/server.rb', line 83

def self.primary_server
  Cron::Server.where(state: STATE_PRIMARY).first
end

.warm_up_serverObject

Warm up a server on the next evaluation



90
91
92
93
94
# File 'lib/app/jobs/cron/server.rb', line 90

def self.warm_up_server
  return unless SystemConfiguration.aws_auto_scaling_configured?

  primary_server.auto_scale([primary_server.desired_server_count + 1, 10].min)
end

Instance Method Details

#active_countObject

Returns the count of active servers



277
278
279
# File 'lib/app/jobs/cron/server.rb', line 277

def active_count
  current_server_count
end

#alive?Boolean

Return true if I’ve reported in the last two minutes

Returns:

  • (Boolean)


141
142
143
# File 'lib/app/jobs/cron/server.rb', line 141

def alive?
  last_check_in_at >= 90.seconds.ago.utc
end

#auto_scale(desired_count = 0) ⇒ Object

Sets the desired and minimum number of EC2 instances to run



254
255
256
257
258
259
260
261
262
# File 'lib/app/jobs/cron/server.rb', line 254

def auto_scale(desired_count = 0)
  set desired_server_count: desired_count
  # Make sure we don't remove any workers with assigned jobs by accident
  return if desired_count.positive? && desired_count <= current_desired_capacity

  client.update_auto_scaling_group(auto_scaling_group_name: sys_config.aws_auto_scaling_group_name,
                                   min_size: desired_count,
                                   desired_capacity: desired_count)
end

#auto_scaling_groupObject

Returns the AutoScalingGroup associated with the account



188
189
190
191
# File 'lib/app/jobs/cron/server.rb', line 188

def auto_scaling_group
  filter = { auto_scaling_group_names: [sys_config.auto_scaling_group_name] }
  @auto_scaling_group ||= client.describe_auto_scaling_groups(filter).auto_scaling_groups.first
end

#become_primaryObject

Become primary, making others secondary



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/app/jobs/cron/server.rb', line 99

def become_primary
  Cron::Server.each(&:become_secondary)
  # sleep a small amount of time to randomize a new primary
  sleep rand(1..15)
  # Check to see if another node already became primary
  primary = Cron::Server.primary_server
  return if primary.present? && primary.alive?

  # no one else is in, so become primary
  update! state: STATE_PRIMARY, last_check_in_at: Time.now.utc
end

#become_secondary(user = nil) ⇒ Object

Become secondary node TODO: [CMS] Update when auditing in place



115
116
117
118
119
120
121
122
# File 'lib/app/jobs/cron/server.rb', line 115

def become_secondary(user = nil)
  if user.present?
    # update_attributes_and_log! user, state: STATE_SECONDARY
    update! state: STATE_SECONDARY
  else
    update! state: STATE_SECONDARY
  end
end

#check_auto_scaleObject

Auto scale environment



162
163
164
165
166
167
168
169
170
# File 'lib/app/jobs/cron/server.rb', line 162

def check_auto_scale
  return unless SystemConfiguration.aws_auto_scaling_configured?

  if delayed_jobs_count.eql?(0)
    handle_zero_job_count
  else
    handle_auto_scale_jobs
  end
end

#check_inObject

Perform a check in for the server



155
156
157
# File 'lib/app/jobs/cron/server.rb', line 155

def check_in
  set last_check_in_at: Time.now.utc
end

#clientObject

Returns the AWS AutoScaling Client



175
176
177
178
179
# File 'lib/app/jobs/cron/server.rb', line 175

def client
  @client ||= Aws::AutoScaling::Client.new(access_key_id: sys_config.aws_access_key_id,
                                           secret_access_key: sys_config.aws_secret_access_key,
                                           region: sys_config.aws_region)
end

#current_desired_capacityObject

Returns the current value of ‘desired capacity’ for the AutoScalingGroup



203
204
205
206
207
208
209
# File 'lib/app/jobs/cron/server.rb', line 203

def current_desired_capacity
  current = auto_scaling_group.desired_capacity
  set current_server_count: current
  current
rescue StandardError
  0
end

#dead?Boolean

Is the server dead, meaning is it not reporting within the last two minutes

Returns:

  • (Boolean)


148
149
150
# File 'lib/app/jobs/cron/server.rb', line 148

def dead?
  !alive?
end

#delayed_jobs_countObject

Returns a count of the Delayed Jobs in queue that have not failed



196
197
198
# File 'lib/app/jobs/cron/server.rb', line 196

def delayed_jobs_count
  @delayed_jobs_count ||= Delayed::Backend::Mongoid::Job.where(failed_at: nil).read(mode: :primary).count
end

#executeObject

Go through the logic once a minute



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/app/jobs/cron/server.rb', line 35

def execute
  if primary?
    run_cron_jobs
  else
    primary = Cron::Server.where(state: STATE_PRIMARY).first
    if primary.blank? || primary.dead?
      become_primary
      run_cron_jobs
    end
  end
  time_to_next_run
rescue StandardError => error
  App47Logger.log_error 'Unable to run cron server', error
  time_to_next_run
ensure
  check_in
end

#handle_auto_scale_jobsObject

Calls the ‘auto_scale’ method with a variable ‘desired_count’ based on how many jobs are running We don’t need any more workers if the job count is less than 1,000



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/app/jobs/cron/server.rb', line 224

def handle_auto_scale_jobs
  return if delayed_jobs_count < 50

  case delayed_jobs_count
  when 50..250
    auto_scale(1)
  when 251..500
    auto_scale(2)
  when 501..1_000
    auto_scale(3)
  when 1_001..2_000
    auto_scale(4)
  when 2_001..3_999
    auto_scale(4)
  when 4_000..7_999
    auto_scale(5)
  when 8_000..10_999
    auto_scale(5)
  when 11_000..13_999
    auto_scale(6)
  when 14_000..17_999
    auto_scale(6)
  else
    auto_scale(7)
  end
end

#handle_zero_job_countObject

Calls the ‘auto_scale’ method with a ‘desired_count’ of 0 unless the capacity is already at 0



214
215
216
217
218
# File 'lib/app/jobs/cron/server.rb', line 214

def handle_zero_job_count
  return if current_desired_capacity.eql?(0)

  auto_scale
end

#high_landerObject

Look to make sure there is only one primary



267
268
269
270
271
272
# File 'lib/app/jobs/cron/server.rb', line 267

def high_lander
  return if secondary? # Don't need to check if not primary

  primary = Cron::Server.where(state: STATE_PRIMARY).first
  errors.add(:state, 'there can only be one primary') unless primary.blank? || primary.eql?(self)
end

#inactive_countObject

Returns the count of inactive servers



284
285
286
# File 'lib/app/jobs/cron/server.rb', line 284

def inactive_count
  desired_server_count
end

#primary?Boolean

Am I the primary server

Returns:

  • (Boolean)


127
128
129
# File 'lib/app/jobs/cron/server.rb', line 127

def primary?
  alive? && STATE_PRIMARY.eql?(state)
end

#run_cron_jobsObject



53
54
55
56
# File 'lib/app/jobs/cron/server.rb', line 53

def run_cron_jobs
  run_jobs
  check_auto_scale
end

#run_jobsObject

Run all cron tab jobs



61
62
63
64
# File 'lib/app/jobs/cron/server.rb', line 61

def run_jobs
  now = Time.now.utc
  Cron::Tab.all.each { |tab| tab.run if tab.time_to_run?(now) }
end

#secondary?Boolean

Am I a secondary server

Returns:

  • (Boolean)


134
135
136
# File 'lib/app/jobs/cron/server.rb', line 134

def secondary?
  STATE_SECONDARY.eql?(state)
end

#sys_configObject



181
182
183
# File 'lib/app/jobs/cron/server.rb', line 181

def sys_config
  @sys_config ||= SystemConfiguration.configuration
end

#time_to_next_runObject

Determine the next minute to run,



69
70
71
# File 'lib/app/jobs/cron/server.rb', line 69

def time_to_next_run
  60 - Time.now.utc.to_i % 60
end