Class: Logical::Naf::JobCreator

Inherits:
Object
  • Object
show all
Defined in:
app/models/logical/naf/job_creator.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.test(*foo) ⇒ Object



141
142
143
144
145
146
# File 'app/models/logical/naf/job_creator.rb', line 141

def self.test(*foo)
  seconds = rand 120 + 15
  puts "TEST CALLED: #{Time.zone.now}: #{foo.inspect}: sleeping for #{seconds} seconds"
  sleep(seconds)
  puts "TEST DONE: #{Time.zone.now}: #{foo.inspect}"
end

Instance Method Details

#create_queue_job(historical_job) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
# File 'app/models/logical/naf/job_creator.rb', line 125

def create_queue_job(historical_job)
  queued_job = ::Naf::QueuedJob.new(application_id: historical_job.application_id,
                                    application_type_id: historical_job.application_type_id,
                                    command: historical_job.command,
                                    application_run_group_restriction_id: historical_job.application_run_group_restriction_id,
                                    application_run_group_name: historical_job.application_run_group_name,
                                    application_run_group_limit: historical_job.application_run_group_limit,
                                    priority: historical_job.priority)
  queued_job.id = historical_job.id
  queued_job.save!
end

#queue_application(application, application_run_group_restriction, application_run_group_name, application_run_group_limit = 1, priority = 0, affinities = [], prerequisites = [], enqueue = false) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'app/models/logical/naf/job_creator.rb', line 4

def queue_application(application,
                      application_run_group_restriction,
                      application_run_group_name,
                      application_run_group_limit = 1,
                      priority = 0,
                      affinities = [],
                      prerequisites = [],
                      enqueue = false)

  # Before adding a job to the queue, check whether the number of
  # jobs (running/queued) is equal to or greater than the application
  # run group limit, or if enqueue_backlogs is set to false. If so,
  # do not add the job to the queue
  running_jobs = retrieve_jobs(::Naf::RunningJob, application.command, application_run_group_name)
  queued_jobs = retrieve_jobs(::Naf::QueuedJob, application.command, application_run_group_name)

  if enqueue == false && (running_jobs.present? || queued_jobs.present?)
    group_limit = running_jobs.try(:application_run_group_limit).to_i + queued_jobs.try(:application_run_group_limit).to_i
    total_jobs = running_jobs.try(:count).to_i + queued_jobs.try(:count).to_i

    return if group_limit <= total_jobs
  end

  ::Naf::HistoricalJob.transaction do
    historical_job = ::Naf::HistoricalJob.create!(application_id: application.id,
                                                  application_type_id: application.application_type_id,
                                                  command: application.command,
                                                  application_run_group_restriction_id: application_run_group_restriction.id,
                                                  application_run_group_name: application_run_group_name,
                                                  application_run_group_limit: application_run_group_limit,
                                                  priority: priority,
                                                  log_level: application.log_level)

    # Create historical job affinity tabs for each affinity associated with the historical job
    affinities.each do |affinity|
      affinity_parameter = ::Naf::ApplicationScheduleAffinityTab.
        where(affinity_id: affinity.id,
              application_schedule_id: application.application_schedule.try(:id)).
        first.try(:affinity_parameter)
      ::Naf::HistoricalJobAffinityTab.create(historical_job_id: historical_job.id,
                                             affinity_id: affinity.id,
                                             affinity_parameter: affinity_parameter)
    end

    verify_and_create_prerequisites(historical_job, prerequisites)

    create_queue_job(historical_job)

    return historical_job
  end
end

#queue_application_schedule(application_schedule, schedules_queued_already = []) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'app/models/logical/naf/job_creator.rb', line 62

def queue_application_schedule(application_schedule, schedules_queued_already = [])
  prerequisite_jobs = []

  # Check if schedule has been queued
  if schedules_queued_already.include? application_schedule.id
    raise ::Naf::HistoricalJob::JobPrerequisiteLoop.new(application_schedule)
  end

  # Keep track of queued schedules
  schedules_queued_already << application_schedule.id
  # Queue application schedule prerequisites
  application_schedule.prerequisites.each do |application_schedule_prerequisite|
    prerequisite_jobs << queue_application_schedule(application_schedule_prerequisite, schedules_queued_already)
  end

  # Queue the application
  return queue_application(application_schedule.application,
                           application_schedule.application_run_group_restriction,
                           application_schedule.application_run_group_name,
                           application_schedule.application_run_group_limit,
                           application_schedule.priority,
                           application_schedule.affinities,
                           prerequisite_jobs,
                           application_schedule.enqueue_backlogs)
end

#queue_rails_job(command, application_run_group_restriction = ::Naf::ApplicationRunGroupRestriction.limited_per_all_machines, application_run_group_name = :command, application_run_group_limit = 1, priority = 0, affinities = [], prerequisites = []) ⇒ Object

This method act similar to queue_application but is used for testing purpose



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'app/models/logical/naf/job_creator.rb', line 89

def queue_rails_job(command,
                    application_run_group_restriction = ::Naf::ApplicationRunGroupRestriction.limited_per_all_machines,
                    application_run_group_name = :command,
                    application_run_group_limit = 1,
                    priority = 0,
                    affinities = [],
                    prerequisites = [])
  application_run_group_name = command if application_run_group_name == :command
  ::Naf::HistoricalJob.transaction do
    historical_job = ::Naf::HistoricalJob.create!(application_type_id: 1,
                                                  command: command,
                                                  application_run_group_restriction_id: application_run_group_restriction.id,
                                                  application_run_group_name: application_run_group_name,
                                                  application_run_group_limit: application_run_group_limit,
                                                  priority: priority)
    affinities.each do |affinity|
      ::Naf::HistoricalJobAffinityTab.create(historical_job_id: historical_job.id, affinity_id: affinity.id)
    end

    verify_and_create_prerequisites(historical_job, prerequisites)

    create_queue_job(historical_job)

    return historical_job
  end
end

#queue_testObject



137
138
139
# File 'app/models/logical/naf/job_creator.rb', line 137

def queue_test
  queue_rails_job("#{self.class.name}.test")
end

#retrieve_jobs(klass, command, application_run_group_name) ⇒ Object



56
57
58
59
60
# File 'app/models/logical/naf/job_creator.rb', line 56

def retrieve_jobs(klass, command, application_run_group_name)
  klass.select('application_run_group_limit, MAX(created_at) AS created_at, count(*)').
    where('command = ? AND application_run_group_name = ?', command, application_run_group_name).
    group('application_run_group_name, application_run_group_limit').first
end

#verify_and_create_prerequisites(job, prerequisites) ⇒ Object



116
117
118
119
120
121
122
123
# File 'app/models/logical/naf/job_creator.rb', line 116

def verify_and_create_prerequisites(job, prerequisites)
  job.verify_prerequisites(prerequisites)
  # Create historical job prerequisites for each prerequisite associated with the historical job
  prerequisites.each do |prerequisite|
    ::Naf::HistoricalJobPrerequisite.create(historical_job_id: job.id,
                                            prerequisite_historical_job_id: prerequisite.id)
  end
end