Class: Bookie::Database::JobSummary

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/bookie/database/job_summary.rb

Overview

A cached summary of Jobs in the database

Most summary operations should be performed through this class to improve efficiency.

Class Method Summary collapse

Class Method Details

.by_command_name(cmd) ⇒ Object

Filters by command name



85
86
87
# File 'lib/bookie/database/job_summary.rb', line 85

def self.by_command_name(cmd)
  where('job_summaries.command_name = ?', cmd)
end

.by_date(date) ⇒ Object

Filters by date



21
22
23
# File 'lib/bookie/database/job_summary.rb', line 21

def self.by_date(date)
  where('job_summaries.date = ?', date)
end

.by_date_range(range) ⇒ Object

Filters by a date range



27
28
29
30
31
32
33
34
# File 'lib/bookie/database/job_summary.rb', line 27

def self.by_date_range(range)
  range = range.normalized
  if range.exclude_end?
    where('? <= job_summaries.date AND job_summaries.date < ?', range.begin, range.end)
  else
    where('? <= job_summaries.date AND job_summaries.date <= ?', range.begin, range.end)
  end
end

.by_group(group) ⇒ Object

Filters by group



50
51
52
# File 'lib/bookie/database/job_summary.rb', line 50

def self.by_group(group)
  joins(:user).where('users.group_id = ?', group.id)
end

.by_group_name(name) ⇒ Object

Filters by group name



56
57
58
59
60
61
62
63
# File 'lib/bookie/database/job_summary.rb', line 56

def self.by_group_name(name)
  group = Group.where(:name => name).first
  if group
    by_group(group)
  else
    self.none
  end
end

.by_system(system) ⇒ Object

Filters by system



67
68
69
# File 'lib/bookie/database/job_summary.rb', line 67

def self.by_system(system)
  where('job_summaries.system_id = ?', system.id)
end

.by_system_name(name) ⇒ Object

Filters by system name



73
74
75
# File 'lib/bookie/database/job_summary.rb', line 73

def self.by_system_name(name)
  joins(:system).where('systems.name = ?', name)
end

.by_system_type(type) ⇒ Object

Filters by system type



79
80
81
# File 'lib/bookie/database/job_summary.rb', line 79

def self.by_system_type(type)
  joins(:system).where('systems.system_type_id = ?', type.id)
end

.by_user(user) ⇒ Object

Filters by user



38
39
40
# File 'lib/bookie/database/job_summary.rb', line 38

def self.by_user(user)
  where('job_summaries.user_id = ?', user.id)
end

.by_user_name(name) ⇒ Object

Filters by user name



44
45
46
# File 'lib/bookie/database/job_summary.rb', line 44

def self.by_user_name(name)
  joins(:user).where('users.name = ?', name)
end

.summarize(date) ⇒ Object

Create cached summaries for the given date

The date is interpreted as being in UTC.

If there is nothing to summarize, a dummy summary will be created.

Uses Lock::synchronize internally; should not be used in transaction blocks



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/bookie/database/job_summary.rb', line 97

def self.summarize(date)
  jobs = Job
  unscoped = self.unscoped
  time_min = date.to_utc_time
  time_range = time_min ... time_min + 1.days
  day_jobs = jobs.by_time_range(time_range)

  #Find the unique combinations of values for some of the jobs' attributes.
  value_sets = day_jobs.uniq.pluck(:user_id, :system_id, :command_name)
  if value_sets.empty?
    #There are no jobs, so create a dummy summary.
    user = User.select(:id).first
    system = System.select(:id).first
    #If there are no users or no systems, we can't create the dummy summary, so just return.
    #To consider: figure out how to create the dummy summary anyway?
    return unless user && system
    #Create a dummy summary so summary() doesn't keep trying to create one.
    Lock[:job_summaries].synchronize do
      sum = unscoped.find_or_initialize_by(
        :date => date,
        :user_id => user.id,
        :system_id => system.id,
        :command_name => ''
      )
      sum.cpu_time = 0
      sum.memory_time = 0
      sum.save!
    end
  else
    value_sets.each do |set|
      summary_jobs = jobs.where(
        :user_id => set[0],
        :system_id => set[1],
        :command_name => set[2]
      )
      summary = summary_jobs.summary(time_range)
      Lock[:job_summaries].synchronize do
        sum = unscoped.find_or_initialize_by(
          :date => date,
          :user_id => set[0],
          :system_id => set[1],
          :command_name => set[2]
        )
        sum.cpu_time = summary[:cpu_time]
        sum.memory_time = summary[:memory_time]
        sum.save!
      end
    end
  end
end

.summary(opts = {}) ⇒ Object

Returns a summary of jobs in the database

The following options are supported:

  • :range

    restricts the summary to a specific time interval (specified as a Range of Time objects)

  • :jobs

    the jobs on which the summary should operate

Internally, this may call JobSummary::summarize, which uses Lock#synchronize, so this should not be used inside a transaction block.

When filtering, the same filters must be applied to both the Jobs and the JobSummaries. For example: jobs = Bookie::Database::Job.by_user_name(‘root’) summaries = Bookie::Database::Job.by_user_name(‘root’) puts summaries.summary(:jobs => jobs)

TODO: test that summaries are created on UTC date boundaries?



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/bookie/database/job_summary.rb', line 163

def self.summary(opts = {})
  jobs = opts[:jobs] || Job
  time_range = opts[:range]

  unless time_range
    start_time = jobs.minimum(:start_time)
    end_time = jobs.maximum(:end_time)
    if start_time && end_time
      time_range = start_time .. end_time
    else
      time_range = Time.new ... Time.new
    end
  end

  time_range = time_range.normalized

  date_begin = time_range.begin.utc.to_date
  rounded_date_begin = false
  #Round date_begin up.
  if date_begin.to_utc_time < time_range.begin
    date_begin += 1
    rounded_date_begin = true
  end
  date_end = time_range.end.utc.to_date

  #Is the interval large enough to cover any cached summaries?
  if date_begin >= date_end
    #Nope; just return a regular summary.
    return jobs.summary(time_range)
  end

  jobs_in_range = jobs.by_time_range(time_range)
  num_jobs = jobs_in_range.count
  successful = jobs_in_range.where('jobs.exit_code = 0').count
  cpu_time = 0
  memory_time = 0

  #To consider: check if num_jobs is zero so we can skip all this?
  if rounded_date_begin
    #We need to get a summary for the chunk up to the first whole day.
    summary = jobs.summary(time_range.begin ... date_begin.to_utc_time)
    cpu_time += summary[:cpu_time]
    memory_time += summary[:memory_time]
  end

  date_end_time = date_end.to_utc_time
  if time_range.cover?(date_end_time)
    #We need to get a summary for the chunk after the last whole day.
    range = Range.new(date_end_time, time_range.end, time_range.exclude_end?)
    summary = jobs.summary(range)
    cpu_time += summary[:cpu_time]
    memory_time += summary[:memory_time]
  end

  date_range = date_begin ... date_end

  #Now we can process the cached summaries.
  unscoped = self.unscoped
  summaries = by_date_range(date_range).order(:date).to_a
  index = 0
  date_range.each do |date|
    new_index = index
    summary = summaries[new_index]
    while summary && summary.date == date do
      cpu_time += summary.cpu_time
      memory_time += summary.memory_time
      new_index += 1
      summary = summaries[new_index]
    end
    #Did we actually process any summaries?
    #If not, have _any_ summaries been created for this day?
    if new_index == index && !(unscoped.by_date(date).any?)
      #Nope. Create the summaries.
      unscoped.summarize(date)
      #TODO: what if a Sender deletes the summaries right before this?
      self.by_date(date).each do |sum|
        cpu_time += sum.cpu_time
        memory_time += sum.memory_time
      end
    end
    index = new_index
  end

  {
    :num_jobs => num_jobs,
    :successful => successful,
    :cpu_time => cpu_time,
    :memory_time => memory_time,
  }
end