Class: BackgroundQueue::ServerLib::Job

Inherits:
PriorityQueue show all
Defined in:
lib/background_queue/server_lib/job.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from PriorityQueue

#each_item, #empty?, #has_running_items?, #number_if_items_at_priority, #number_of_priorities, #peek, #pop, #priority, #push, #remove, #stalled=, #stalled?

Constructor Details

#initialize(id, owner) ⇒ Job

attr_reader :current_running_excluded_status



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/background_queue/server_lib/job.rb', line 26

def initialize(id, owner)
  @id = id
  @owner = owner
  @stalled = false
  @total_tasks = 0
  @total_counted_tasks = 0
  @completed_tasks = 0
  @completed_counted_tasks = 0
  @running_status = {}
  @running_ordered_status = []
  @running_percent = 0
  @current_running_status = nil
  @current_progress = {:percent=>0.0, :caption=>""}
  @current_caption = ""
  @synchronous_count = 0
  @total_weighted_tasks = 0
  @total_weighted_percent = 0.0
  @completed_weighted_percent = 0.0
  @completed_weighted_tasks = 0
  @running_percent_weighted = 0.0
  @status_meta = {}
  @mutex = Mutex.new
  #@current_running_excluded_status = nil
  super()
end

Instance Attribute Details

#completed_counted_tasksObject (readonly)

Returns the value of attribute completed_counted_tasks.



11
12
13
# File 'lib/background_queue/server_lib/job.rb', line 11

def completed_counted_tasks
  @completed_counted_tasks
end

#completed_tasksObject (readonly)

Returns the value of attribute completed_tasks.



10
11
12
# File 'lib/background_queue/server_lib/job.rb', line 10

def completed_tasks
  @completed_tasks
end

#completed_weighted_percentObject (readonly)

Returns the value of attribute completed_weighted_percent.



18
19
20
# File 'lib/background_queue/server_lib/job.rb', line 18

def completed_weighted_percent
  @completed_weighted_percent
end

#completed_weighted_tasksObject (readonly)

Returns the value of attribute completed_weighted_tasks.



19
20
21
# File 'lib/background_queue/server_lib/job.rb', line 19

def completed_weighted_tasks
  @completed_weighted_tasks
end

#current_running_statusObject (readonly)

Returns the value of attribute current_running_status.



14
15
16
# File 'lib/background_queue/server_lib/job.rb', line 14

def current_running_status
  @current_running_status
end

#idObject

Returns the value of attribute id.



5
6
7
# File 'lib/background_queue/server_lib/job.rb', line 5

def id
  @id
end

#running_ordered_statusObject (readonly)

Returns the value of attribute running_ordered_status.



7
8
9
# File 'lib/background_queue/server_lib/job.rb', line 7

def running_ordered_status
  @running_ordered_status
end

#running_percentObject (readonly)

Returns the value of attribute running_percent.



12
13
14
# File 'lib/background_queue/server_lib/job.rb', line 12

def running_percent
  @running_percent
end

#running_percent_countedObject (readonly)

Returns the value of attribute running_percent_counted.



13
14
15
# File 'lib/background_queue/server_lib/job.rb', line 13

def running_percent_counted
  @running_percent_counted
end

#running_percent_weightedObject (readonly)

Returns the value of attribute running_percent_weighted.



20
21
22
# File 'lib/background_queue/server_lib/job.rb', line 20

def running_percent_weighted
  @running_percent_weighted
end

#running_statusObject (readonly)

Returns the value of attribute running_status.



6
7
8
# File 'lib/background_queue/server_lib/job.rb', line 6

def running_status
  @running_status
end

#summaryObject (readonly)

Returns the value of attribute summary.



22
23
24
# File 'lib/background_queue/server_lib/job.rb', line 22

def summary
  @summary
end

#total_counted_tasksObject (readonly)

Returns the value of attribute total_counted_tasks.



9
10
11
# File 'lib/background_queue/server_lib/job.rb', line 9

def total_counted_tasks
  @total_counted_tasks
end

#total_tasksObject (readonly)

Returns the value of attribute total_tasks.



8
9
10
# File 'lib/background_queue/server_lib/job.rb', line 8

def total_tasks
  @total_tasks
end

#total_weighted_percentObject (readonly)

Returns the value of attribute total_weighted_percent.



17
18
19
# File 'lib/background_queue/server_lib/job.rb', line 17

def total_weighted_percent
  @total_weighted_percent
end

#total_weighted_tasksObject (readonly)

Returns the value of attribute total_weighted_tasks.



16
17
18
# File 'lib/background_queue/server_lib/job.rb', line 16

def total_weighted_tasks
  @total_weighted_tasks
end

Instance Method Details

#==(other) ⇒ Object



52
53
54
# File 'lib/background_queue/server_lib/job.rb', line 52

def ==(other)
  @id == other.id
end

#add_item(task) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/background_queue/server_lib/job.rb', line 64

def add_item(task)
  task.set_job(self)
  @total_tasks += 1
  unless task.is_excluded_from_count?
    @total_counted_tasks += 1
  end
  if task.weighted?
    @total_weighted_tasks += 1
    @total_weighted_percent += task.weighted_percent
  end
  #@synchronous_count+=1 if task.synchronous? #the queue only goes into sync mode once the task is running/about to run
  unless task.initial_progress_caption.nil? || task.initial_progress_caption.length == 0 || @current_progress[:percent] > 0
    @current_progress[:caption] = task.initial_progress_caption
  end
  push(task)
end

#deregister_running_status(task_id) ⇒ Object



134
135
136
137
138
139
140
# File 'lib/background_queue/server_lib/job.rb', line 134

def deregister_running_status(task_id)
  rstatus = @running_status.delete(task_id)
  unless rstatus.nil?
    @running_ordered_status.delete(rstatus) 
  end
  rstatus
end

#finish_item(item) ⇒ Object



92
93
94
95
# File 'lib/background_queue/server_lib/job.rb', line 92

def finish_item(item)
  @running_items -= 1
  @synchronous_count-=1 if item.synchronous?
end

#get_current_counted_tasksObject



274
275
276
277
278
279
280
# File 'lib/background_queue/server_lib/job.rb', line 274

def get_current_counted_tasks
  cnt = self.completed_counted_tasks + self.running_percent_counted.to_i 
  if cnt < self.total_counted_tasks
    cnt += 1
  end
  cnt
end

#get_current_progressObject



290
291
292
# File 'lib/background_queue/server_lib/job.rb', line 290

def get_current_progress
  @current_progress
end

#get_current_progress_captionObject



258
259
260
261
262
263
# File 'lib/background_queue/server_lib/job.rb', line 258

def get_current_progress_caption
  if self.current_running_status
    update_current_caption(self.current_running_status)
  end
  @current_caption
end

#get_current_progress_percentObject



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/background_queue/server_lib/job.rb', line 240

def get_current_progress_percent
  unweighted_percent = (100.0 - self.total_weighted_percent) / 100.0

  total_unweighted_tasks = self.total_tasks - self.total_weighted_tasks
  completed_unweighted_tasks = self.completed_tasks - self.completed_weighted_tasks
  total_finished_percent = total_unweighted_tasks == 0 ? 0 : (completed_unweighted_tasks.to_f / total_unweighted_tasks.to_f) * 100.0 
  running_fraction = total_unweighted_tasks == 0 ?  1.0 : (1.0 / total_unweighted_tasks.to_f)
  
  
  total_running_percent = self.running_percent.to_f * running_fraction * 100.0 
  
  total_unweighted_percent = (total_finished_percent + total_running_percent) * unweighted_percent
  
  total_percent = total_unweighted_percent.to_f + (running_percent_weighted * 100.0) + completed_weighted_percent
  
  total_percent
end

#get_running_status(status) ⇒ Object



117
118
119
120
121
# File 'lib/background_queue/server_lib/job.rb', line 117

def get_running_status(status)
  rstatus = @running_status[status[:task_id]]
  rstatus = register_running_status(status) if rstatus.nil?
  rstatus
end

#inspectObject



56
57
58
# File 'lib/background_queue/server_lib/job.rb', line 56

def inspect
  "#{self.id}:#{@queues.inspect}"
end

#next_itemObject



81
82
83
84
85
86
# File 'lib/background_queue/server_lib/job.rb', line 81

def next_item
  item = pop
  @running_items += 1 if item
  @synchronous_count+=1 if item && item.synchronous?
  item
end

#register_running_status(status) ⇒ Object



123
124
125
126
127
128
129
130
131
132
# File 'lib/background_queue/server_lib/job.rb', line 123

def register_running_status(status)
  rstatus = {:task_id=>status[:task_id], :caption=>status[:caption], :percent=>0, :exclude=>status[:exclude], :weight=>status[:weight] }
  @running_status[status[:task_id]] = rstatus
  #if status[:exclude]
  #  @current_running_excluded_status = rstatus
  #else
    @running_ordered_status << rstatus 
  #end
  rstatus
end

#remove_item(item) ⇒ Object



88
89
90
# File 'lib/background_queue/server_lib/job.rb', line 88

def remove_item(item)
  remove(item)
end

#serverObject



60
61
62
# File 'lib/background_queue/server_lib/job.rb', line 60

def server
  @owner.server
end

#set_running_percent(pcent_counted, pcent, running_task_pcent, weighted_percent) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
# File 'lib/background_queue/server_lib/job.rb', line 228

def set_running_percent(pcent_counted, pcent, running_task_pcent, weighted_percent)
  @running_percent_counted = pcent_counted
  @running_percent = pcent
  @running_percent_weighted = weighted_percent
  idx = running_task_pcent.to_i
  if @running_ordered_status.length <= idx
    @current_running_status = @running_ordered_status.last
  else
    @current_running_status = @running_ordered_status[idx]
  end
end

#set_worker_status(status) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/background_queue/server_lib/job.rb', line 102

def set_worker_status(status)
  if status[:meta]
    update_status_meta(status[:meta])
  elsif status[:summary]
    update_summary_meta(status)
  else
    running_status = get_running_status(status)
    if status[:percent] >= 100
      update_finished_status(status)
    else
      update_running_status(running_status, status)
    end
  end
end

#synchronous?Boolean

Returns:

  • (Boolean)


97
98
99
100
# File 'lib/background_queue/server_lib/job.rb', line 97

def synchronous?
  next_item = peek
  @synchronous_count > 0 || (next_item && next_item.synchronous?)
end

#update_current_caption(status) ⇒ Object



265
266
267
268
269
270
271
272
# File 'lib/background_queue/server_lib/job.rb', line 265

def update_current_caption(status)
  caption = status[:caption]
  caption = "" if caption.nil?
  if total_counted_tasks > 1 && status[:exclude] != true
    caption = "#{caption} (#{self.get_current_counted_tasks}/#{self.total_counted_tasks})"
  end
  @current_caption = caption
end

#update_current_progressObject



282
283
284
285
286
287
288
# File 'lib/background_queue/server_lib/job.rb', line 282

def update_current_progress
  @current_progress = {
    :percent=>get_current_progress_percent,
    :caption=>get_current_progress_caption
  }.update(@status_meta)
  #puts "set status to #{@current_progress.inspect}"
end

#update_finished_status(status) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/background_queue/server_lib/job.rb', line 193

def update_finished_status(status)
  rstatus = deregister_running_status(status[:task_id])
  unless rstatus.nil?
    @completed_tasks += 1
    @completed_counted_tasks += 1 unless rstatus[:exclude]
    unless rstatus[:weight].nil?
      @completed_weighted_percent += rstatus[:weight] 
      @completed_weighted_tasks += 1
    end
    if self.current_running_status.nil? || @current_running_status == rstatus
      #sometimes the status is finished straight away...
      update_current_caption(status)
    end
    update_running_percent()
  end
end

#update_running_percentObject



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/background_queue/server_lib/job.rb', line 210

def update_running_percent
  total_percent = 0.0
  total_task_percent = 0.0
  total_percent_counted = 0.0
  total_weighted_percent = 0.0
  for status in @running_ordered_status
    if status[:weight] && status[:weight] > 0
      total_weighted_percent += (status[:percent] * status[:weight] / 100.0)
    else
      total_percent_counted += status[:percent] unless status[:exclude]
      total_percent += status[:percent]
    end
    total_task_percent += status[:percent]
  end
  set_running_percent(total_percent_counted.to_f / 100.0, total_percent.to_f / 100.0, total_task_percent / 100.0, total_weighted_percent.to_f / 100.0)
  self.update_current_progress
end

#update_running_status(running_status, status) ⇒ Object



142
143
144
145
146
# File 'lib/background_queue/server_lib/job.rb', line 142

def update_running_status(running_status, status)
  running_status[:percent] = status[:percent]
  running_status[:caption] = status[:caption]
  update_running_percent
end

#update_status_meta(meta) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/background_queue/server_lib/job.rb', line 149

def update_status_meta(meta)
  
  [:notice, :warning, :error].each { |key|
    val = BackgroundQueue::Utils.get_hash_entry(meta, key)
    unless val.nil?
      @status_meta[key] = [] if @status_meta[key].nil?
      @status_meta[key] << val
    end
  }
  val = BackgroundQueue::Utils.get_hash_entry(meta, :meta)
  unless val.nil?
    @status_meta[:meta] = {} if @status_meta[:meta].nil?
    @status_meta[:meta] = @status_meta[:meta].update(val)
  end
  update_current_progress
end

#update_summary_meta(status) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/background_queue/server_lib/job.rb', line 166

def update_summary_meta(status)
  @summary ||= {}
  type = status[:type].intern
  case status[:summary]
  when "app"
    @summary[type] ||= []
    @summary[type] << status[:data]
  when "set"
    @summary[type] ||= {}
    @summary[type][status[:key]] = status[:data]
  when "inc"
    @summary[type] ||= 0
    @summary[type] += status[:data].to_i
  when "dec"
    @summary[type] ||= 0
    @summary[type] -= status[:data].to_i
  when "res"
    if type == :all
      @summary = {}
    else
      @summary.delete(type)
    end
  else
    logger.error("Unknown summary action: #{status[:summary]}")
  end
end