Class: RestFtpDaemon::JobQueue

Inherits:
Object
  • Object
show all
Includes:
BmcDaemonLib::LoggerHelper, NewRelic::Agent::Instrumentation::ControllerInstrumentation, CommonHelpers, Singleton
Defined in:
lib/rest-ftp-daemon/job_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from CommonHelpers

#dashboard_url, #exception_to_error, #format_bytes, #identifier, #underscore

Constructor Details

#initializeJobQueue

Returns a new instance of JobQueue.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rest-ftp-daemon/job_queue.rb', line 16

def initialize
  # Initialize values
  @queues = {}
  @waitings = {}
  @jobs = []
  @last_id = 0

  @queues.taint          # enable tainted communication
  @waitings.taint
  taint

  # Create mutex
  @mutex = Mutex.new

  # Logger
  @logger = BmcDaemonLib::LoggerPool.instance.get :queue

  # Identifiers generator
  @prefix = identifier JOB_IDENT_LEN
  log_info "JobQueue initialized (prefix: #{@prefix})"
end

Instance Attribute Details

#jobsObject (readonly)

Returns the value of attribute jobs.



14
15
16
# File 'lib/rest-ftp-daemon/job_queue.rb', line 14

def jobs
  @jobs
end

#loggerObject (readonly)

Class options



13
14
15
# File 'lib/rest-ftp-daemon/job_queue.rb', line 13

def logger
  @logger
end

Instance Method Details

#clearObject



208
209
210
# File 'lib/rest-ftp-daemon/job_queue.rb', line 208

def clear
  @queue.clear
end

#create_job(params) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rest-ftp-daemon/job_queue.rb', line 38

def create_job params
  # Build class name and chock if it exists
  # klass = Kernel.const_get("Job#{params[:type].to_s.capitalize}") rescue nil
  klass_name = "Job#{params[:type].to_s.capitalize}"
  klass = Kernel.const_get(klass_name) rescue nil

  # If object not found, don't create a job !
  unless klass && klass < Job
    message = "can't create [#{klass_name}] for type [#{params[:type]}]"
    log_error "JobQueue.create_job: #{message}"
    raise QueueCantCreateJob, message
  end

  # Generate an ID and stack it
  @mutex.synchronize do
    @last_id += 1
  end
  job_id = prefixed_id(@last_id)

  # Instantiate it and return the now object
  log_info "JobQueue.create_job: creating [#{klass.name}] with ID [#{job_id}]"
  job = klass.new(job_id, params)

  # Push it on the queue
  push job

  return job
end

#empty?Boolean

def jobs_ids

@jobs.collect(&:id)

end

Returns:

  • (Boolean)


132
133
134
# File 'lib/rest-ftp-daemon/job_queue.rb', line 132

def empty?
  @queue.empty?
end

#expire(status, maxage, verbose = false) ⇒ Object

Jobs cleanup



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/rest-ftp-daemon/job_queue.rb', line 226

def expire status, maxage, verbose = false
# FIXME: clean both @jobs and @queue
  # Init
  return if status.nil? || maxage <= 0

  # Compute oldest limit
  time_limit = Time.now - maxage.to_i
  log_info "JobQueue.expire limit [#{time_limit}] status [#{status}]" if verbose

  @mutex.synchronize do
    # Delete jobs from the queue when they match status and age limits
    @jobs.delete_if do |job|
      # log_debug "testing job [#{job.id}] updated_at [#{job.updated_at}]"

      # Skip if wrong status, updated_at invalid, or updated since time_limit
      next unless job.status == status
      next if job.updated_at.nil?
      next if job.updated_at >= time_limit

      # Ok, we have to clean it up ..
      log_info "expire [#{status}]: job [#{job.id}] updated_at [#{job.updated_at}]"

      # From any queues, remove it
      @queues.each do |pool, jobs|
        log_debug "#{LOG_INDENT}unqueued from [#{pool}]" if jobs.delete(job)
      end

      # Remember we have to delete the original job !
      true
    end
  end

end

#find_by_id(id, prefixed = false) ⇒ Object

Queue access



141
142
143
144
145
146
147
148
# File 'lib/rest-ftp-daemon/job_queue.rb', line 141

def find_by_id id, prefixed = false
  # Build a prefixed id if expected
  id = prefixed_id(id) if prefixed
  log_info "find_by_id (#{id}, #{prefixed}) > #{id}"

  # Search in jobs queues
  @jobs.find { |item| item.id == id }
end

#generate_idObject



67
68
69
70
71
72
# File 'lib/rest-ftp-daemon/job_queue.rb', line 67

def generate_id
  @mutex.synchronize do
    @last_id += 1
  end
  prefixed_id @last_id
end

#jobs_by_statusObject



122
123
124
125
126
# File 'lib/rest-ftp-daemon/job_queue.rb', line 122

def jobs_by_status
  statuses = {}
  @jobs.group_by { |job| job.status }.map { |status, jobs| statuses[status] = jobs.size }
  statuses
end

#jobs_countObject

Queue infos



118
119
120
# File 'lib/rest-ftp-daemon/job_queue.rb', line 118

def jobs_count
  @jobs.length
end

#jobs_queuedObject



74
75
76
# File 'lib/rest-ftp-daemon/job_queue.rb', line 74

def jobs_queued
  @queues
end

#jobs_with_status(status) ⇒ Object

Jobs acess and searching



213
214
215
216
217
218
219
220
221
222
223
# File 'lib/rest-ftp-daemon/job_queue.rb', line 213

def jobs_with_status status
  # No status filter: return all execept queued
  if status.empty?
    @jobs.reject { |job| job.status == JOB_STATUS_QUEUED }

  # Status filtering: only those jobs
  else
    @jobs.select { |job| job.status == status.to_s }

  end
end

#pop(pool, non_block = false) ⇒ Object Also known as: shift, deq



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/rest-ftp-daemon/job_queue.rb', line 189

def pop pool, non_block = false
  @mutex.synchronize do
    myqueue = (@queues[pool] ||= [])
    @waitings[pool] ||= []
    loop do
      if myqueue.empty?
        #puts "JobQueue.pop(#{pool}): empty"
        raise ThreadError, "queue empty" if non_block
        @waitings[pool].push Thread.current
        @mutex.sleep
      else
        return myqueue.pop
      end
    end
  end
end

#push(job) ⇒ Object Also known as: <<, enq, requeue



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/rest-ftp-daemon/job_queue.rb', line 150

def push job
  # Check that item responds to "priorty" method
  raise "JobQueue.push: job should respond to: priority" unless job.respond_to? :priority
  raise "JobQueue.push: job should respond to: id" unless job.respond_to? :id
  raise "JobQueue.push: job should respond to: pool" unless job.respond_to? :pool
  raise "JobQueue.push: job should respond to: reset" unless job.respond_to? :reset

  @mutex.synchronize do
    # Get this job's pool & prepare queue of this pool
    pool = job.pool
    myqueue = (@queues[pool] ||= [])

    # Store the job into the global jobs list, if not already inside
    @jobs.push(job) unless @jobs.include?(job)

    # Push job into the queue, if not already inside
    myqueue.push(job) unless myqueue.include?(job)

    # Inform the job that it's been queued / reset it
    job.reset

    # Refresh queue order
    #sort_queue!(pool)
    myqueue.sort_by!(&:weight)

    # Try to wake a worker up
    begin
      @waitings[pool] ||= []
      t = @waitings[pool].shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  end
end

#queued_by_poolObject



78
79
80
81
82
83
84
# File 'lib/rest-ftp-daemon/job_queue.rb', line 78

def queued_by_pool
  result = {}
  @queues.each do |pool, jobs|
    result[pool] = jobs.count
  end
  result
end

#rate_by(method_name) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/rest-ftp-daemon/job_queue.rb', line 86

def rate_by method_name
  # Init
  result = {}
  #return unless Job.new(0, {}).respond_to? method_name

  # Select only running jobs
  @jobs.each do |job|

    # Compute jobs's group, next if empty
    group = job.send(method_name)
    next if group.nil?

    # Initialize rate entry
    result[group] ||= nil

    # If job is not uploading, next !
    next unless job.status == JOB_STATUS_UPLOADING

    # Extract current rate, next if not available
    rate = job.get_info INFO_BITRATE
    next if rate.nil?

    # Add its current rate
    result[group] ||= 0
    result[group] += rate
  end

  # Return the rate
  result
end