Class: Gizzard::Transformation::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/gizzard/transformation_scheduler.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :max_copies      => 30,
  :copies_per_host => 8,
  :poll_interval   => 10,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(nameserver, base_name, transformations, options = {}) ⇒ Scheduler

Returns a new instance of Scheduler.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/gizzard/transformation_scheduler.rb', line 19

def initialize(nameserver, base_name, transformations, options = {})
  options = DEFAULT_OPTIONS.merge(options)
  @nameserver         = nameserver
  @transformations    = transformations
  @max_copies         = options[:max_copies]
  @copies_per_host    = options[:copies_per_host]
  @poll_interval      = options[:poll_interval]
  @be_quiet           = options[:quiet]
  @dont_show_progress = options[:no_progress] || @be_quiet
  @batch_finish       = options[:batch_finish]

  @jobs_in_progress = []
  @jobs_finished    = []

  @jobs_pending = Set.new(transformations.map do |transformation, forwardings_to_shards|
    transformation.bind(base_name, forwardings_to_shards)
  end.flatten)
end

Instance Attribute Details

#copies_per_hostObject (readonly)

Returns the value of attribute copies_per_host.



11
12
13
# File 'lib/gizzard/transformation_scheduler.rb', line 11

def copies_per_host
  @copies_per_host
end

#max_copiesObject (readonly)

Returns the value of attribute max_copies.



11
12
13
# File 'lib/gizzard/transformation_scheduler.rb', line 11

def max_copies
  @max_copies
end

#nameserverObject (readonly)

Returns the value of attribute nameserver.



10
11
12
# File 'lib/gizzard/transformation_scheduler.rb', line 10

def nameserver
  @nameserver
end

#transformationsObject (readonly)

Returns the value of attribute transformations.



10
11
12
# File 'lib/gizzard/transformation_scheduler.rb', line 10

def transformations
  @transformations
end

Instance Method Details

#apply!Object

on job completion:

  1. run cleanup ops

  2. remove from jobs_in_progress

  3. put in jos_finished

  4. schedule a new job or reload app servers.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/gizzard/transformation_scheduler.rb', line 51

def apply!
  @start_time = Time.now
  control_interrupts
  
  loop do
    reload_busy_shards
    cleanup_jobs if !@batch_finish
    schedule_jobs(max_copies - busy_shards.length)

    cleanup_jobs if @batch_finish && @jobs_pending.empty? && jobs_completed == @jobs_in_progress
    break if @jobs_pending.empty? && @jobs_in_progress.empty?

    unless nameserver.dryrun?
      if @dont_show_progress
        sleep(@poll_interval)
      else
        sleep_with_progress(@poll_interval)
      end
    end
  end

  nameserver.reload_updated_forwardings

  log "#{@jobs_finished.length} transformation#{'s' if @jobs_finished.length > 1} applied. Total time elapsed: #{time_elapsed}"
end

#busy_hosts(extra_hosts = []) ⇒ Object



153
154
155
156
157
158
159
160
161
# File 'lib/gizzard/transformation_scheduler.rb', line 153

def busy_hosts(extra_hosts = [])
  hosts = extra_hosts + busy_shards.map {|s| s.hostname }

  copies_count_map = hosts.inject({}) do |h, host|
    h.update(host => 1) {|_,a,b| a + b }
  end

  copies_count_map.select {|_, count| count >= @copies_per_host }.inject(Set.new) {|set,(host, _)| set.add(host) }
end

#busy_shardsObject



144
145
146
147
148
149
150
151
# File 'lib/gizzard/transformation_scheduler.rb', line 144

def busy_shards
  @busy_shards ||=
    if nameserver.dryrun?
      Set.new
    else
      nameserver.get_busy_shards.inject(Set.new) {|set, shard| set.add(shard.id) }
    end
end

#cleanup_jobsObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/gizzard/transformation_scheduler.rb', line 119

def cleanup_jobs
  jobs = jobs_completed

  unless jobs.empty?
    @jobs_in_progress -= jobs

    log "FINISHING:"
    jobs.each do |j|
      log "  #{j.inspect}"
      j.cleanup!(nameserver)
    end

    @jobs_finished.concat(jobs)
  end
end

#clear_progress_stringObject



171
172
173
174
175
176
# File 'lib/gizzard/transformation_scheduler.rb', line 171

def clear_progress_string
  if @progress_string
    print "\r" + (" " * (@progress_string.length + 10)) + "\r"
    @progress_string = nil
  end
end

#control_interruptsObject

Trap interrupt (Ctrl+C) for better/safer handling



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/gizzard/transformation_scheduler.rb', line 226

def control_interrupts
  ints_left = 3
  trap("INT") do
    ints_left -= 1 
    if !@jobs_pending.empty?
      # get rid of scheduled jobs
      puts "\nINTERRUPT RECEIVED! Cancelling jobs not yet started. Finishing jobs in progress..."
      @jobs_pending.clear
    end
    if ints_left > 0
      puts "\nPress Ctrl+C #{ints_left} more time#{'s' if ints_left > 1} to terminate jobs in progress. This is dangerous."
    end
    if ints_left == 1
      puts "This could leave the database in a bad state. Make sure you know what you're doing."
    elsif ints_left == 0
      puts "\nTerminating on interrupt..."
      exit 1
    end
  end
end

#jobs_completedObject



135
136
137
# File 'lib/gizzard/transformation_scheduler.rb', line 135

def jobs_completed
  @jobs_in_progress.select {|j| (busy_shards & j.involved_shards).empty? }
end

#log(*args) ⇒ Object



178
179
180
181
182
183
# File 'lib/gizzard/transformation_scheduler.rb', line 178

def log(*args)
  unless @be_quiet
    clear_progress_string
    puts *args
  end
end

#put_copy_progressObject



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/gizzard/transformation_scheduler.rb', line 185

def put_copy_progress
  @i ||= 0
  @i  += 1

  unless @jobs_in_progress.empty? || busy_shards.empty?
    spinner         = ['-', '\\', '|', '/'][@i % 4]
    elapsed_txt     = "Time elapsed: #{time_elapsed}"
    pending_txt     = "Pending: #{@jobs_pending.length}"
    finished_txt    = "Finished: #{@jobs_finished.length}"
    in_progress_txt =
      if busy_shards.length != @jobs_in_progress.length
        "In progress: #{@jobs_in_progress.length} (Copies: #{busy_shards.length})"
      else
        "In progress: #{@jobs_in_progress.length}"
      end

    clear_progress_string

    @progress_string = "#{spinner} #{in_progress_txt} #{pending_txt} #{finished_txt} #{elapsed_txt}"
    print @progress_string; $stdout.flush
  end
end

#reload_busy_shardsObject



139
140
141
142
# File 'lib/gizzard/transformation_scheduler.rb', line 139

def reload_busy_shards
  @busy_shards = nil
  busy_shards
end

#schedule_jobs(num_to_schedule) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/gizzard/transformation_scheduler.rb', line 77

def schedule_jobs(num_to_schedule)
  to_be_busy_hosts = []
  jobs             = []

  @jobs_pending.each do |j|
    if (busy_hosts(to_be_busy_hosts) & j.involved_hosts).empty?
      jobs << j
      to_be_busy_hosts.concat j.involved_hosts_array

      break if jobs.length == num_to_schedule
    end
  end

  @jobs_pending.subtract(jobs)

  jobs = jobs.sort_by {|t| t.forwarding }

  unless jobs.empty?
    log "STARTING:"
    jobs.each do |j|
      log "  #{j.inspect}"
      j.prepare!(nameserver)
    end

    nameserver.reload_updated_forwardings

    copy_jobs = jobs.select {|j| j.copy_required? }

    unless copy_jobs.empty?
      log "COPIES:"
      copy_jobs.each do |j|
        j.copy_descs.each {|d| log "  #{d}" }
        j.copy!(nameserver)
      end

      reload_busy_shards
    end

    @jobs_in_progress.concat(jobs)
  end
end

#sleep_with_progress(interval) ⇒ Object



163
164
165
166
167
168
169
# File 'lib/gizzard/transformation_scheduler.rb', line 163

def sleep_with_progress(interval)
  start = Time.now
  while (Time.now - start) < interval
    put_copy_progress
    sleep 0.2
  end
end

#time_elapsedObject



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/gizzard/transformation_scheduler.rb', line 208

def time_elapsed
  s = (Time.now - @start_time).to_i

  if s == 1
    "1 second"
  elsif s < 60
    "#{s} seconds"
  else
    days    = s / (60 * 60 * 24)               if s >= 60 * 60 * 24
    hours   = (s % (60 * 60 * 24)) / (60 * 60) if s >= 60 * 60
    minutes = (s % (60 * 60)) / 60             if s >= 60
    seconds = s % 60

    [days,hours,minutes,seconds].compact.map {|i| "%0.2i" % i }.join(":")
  end
end