Class: RestFtpDaemon::JobQueue
- Inherits:
-
Object
- Object
- RestFtpDaemon::JobQueue
- Includes:
- BmcDaemonLib::LoggerHelper, NewRelic::Agent::Instrumentation::ControllerInstrumentation, CommonHelpers, Singleton
- Defined in:
- lib/rest-ftp-daemon/job_queue.rb
Instance Attribute Summary collapse
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
#logger ⇒ Object
readonly
Class options.
Instance Method Summary collapse
- #clear ⇒ Object
- #create_job(params) ⇒ Object
-
#empty? ⇒ Boolean
def jobs_ids @jobs.collect(&:id) end.
-
#expire(status, maxage, verbose = false) ⇒ Object
Jobs cleanup.
-
#find_by_id(id, prefixed = false) ⇒ Object
Queue access.
- #generate_id ⇒ Object
-
#initialize ⇒ JobQueue
constructor
A new instance of JobQueue.
- #jobs_by_status ⇒ Object
-
#jobs_count ⇒ Object
Queue infos.
- #jobs_queued ⇒ Object
-
#jobs_with_status(status) ⇒ Object
Jobs acess and searching.
- #pop(pool, non_block = false) ⇒ Object (also: #shift, #deq)
- #push(job) ⇒ Object (also: #<<, #enq, #requeue)
- #queued_by_pool ⇒ Object
- #rate_by(method_name) ⇒ Object
Methods included from CommonHelpers
#dashboard_url, #exception_to_error, #format_bytes, #identifier, #underscore
Constructor Details
#initialize ⇒ JobQueue
Returns a new instance of JobQueue.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 16 def initialize # Initialize values @queues = {} @waitings = {} @jobs = [] @last_id = 0 @queues.taint # enable tainted communication @waitings.taint taint # Create mutex @mutex = Mutex.new # Logger @logger = BmcDaemonLib::LoggerPool.instance.get :queue # Identifiers generator @prefix = identifier JOB_IDENT_LEN log_info "JobQueue initialized (prefix: #{@prefix})" end |
Instance Attribute Details
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
14 15 16 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 14 def jobs @jobs end |
#logger ⇒ Object (readonly)
Class options
13 14 15 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 13 def logger @logger end |
Instance Method Details
#clear ⇒ Object
208 209 210 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 208 def clear @queue.clear end |
#create_job(params) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 38 def create_job params # Build class name and chock if it exists # klass = Kernel.const_get("Job#{params[:type].to_s.capitalize}") rescue nil klass_name = "Job#{params[:type].to_s.capitalize}" klass = Kernel.const_get(klass_name) rescue nil # If object not found, don't create a job ! unless klass && klass < Job = "can't create [#{klass_name}] for type [#{params[:type]}]" log_error "JobQueue.create_job: #{}" raise QueueCantCreateJob, end # Generate an ID and stack it @mutex.synchronize do @last_id += 1 end job_id = prefixed_id(@last_id) # Instantiate it and return the now object log_info "JobQueue.create_job: creating [#{klass.name}] with ID [#{job_id}]" job = klass.new(job_id, params) # Push it on the queue push job return job end |
#empty? ⇒ Boolean
def jobs_ids
@jobs.collect(&:id)
end
132 133 134 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 132 def empty? @queue.empty? end |
#expire(status, maxage, verbose = false) ⇒ Object
Jobs cleanup
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 226 def expire status, maxage, verbose = false # FIXME: clean both @jobs and @queue # Init return if status.nil? || maxage <= 0 # Compute oldest limit time_limit = Time.now - maxage.to_i log_info "JobQueue.expire limit [#{time_limit}] status [#{status}]" if verbose @mutex.synchronize do # Delete jobs from the queue when they match status and age limits @jobs.delete_if do |job| # log_debug "testing job [#{job.id}] updated_at [#{job.updated_at}]" # Skip if wrong status, updated_at invalid, or updated since time_limit next unless job.status == status next if job.updated_at.nil? next if job.updated_at >= time_limit # Ok, we have to clean it up .. log_info "expire [#{status}]: job [#{job.id}] updated_at [#{job.updated_at}]" # From any queues, remove it @queues.each do |pool, jobs| log_debug "#{LOG_INDENT}unqueued from [#{pool}]" if jobs.delete(job) end # Remember we have to delete the original job ! true end end end |
#find_by_id(id, prefixed = false) ⇒ Object
Queue access
141 142 143 144 145 146 147 148 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 141 def find_by_id id, prefixed = false # Build a prefixed id if expected id = prefixed_id(id) if prefixed log_info "find_by_id (#{id}, #{prefixed}) > #{id}" # Search in jobs queues @jobs.find { |item| item.id == id } end |
#generate_id ⇒ Object
67 68 69 70 71 72 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 67 def generate_id @mutex.synchronize do @last_id += 1 end prefixed_id @last_id end |
#jobs_by_status ⇒ Object
122 123 124 125 126 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 122 def jobs_by_status statuses = {} @jobs.group_by { |job| job.status }.map { |status, jobs| statuses[status] = jobs.size } statuses end |
#jobs_count ⇒ Object
Queue infos
118 119 120 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 118 def jobs_count @jobs.length end |
#jobs_queued ⇒ Object
74 75 76 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 74 def jobs_queued @queues end |
#jobs_with_status(status) ⇒ Object
Jobs acess and searching
213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 213 def jobs_with_status status # No status filter: return all execept queued if status.empty? @jobs.reject { |job| job.status == JOB_STATUS_QUEUED } # Status filtering: only those jobs else @jobs.select { |job| job.status == status.to_s } end end |
#pop(pool, non_block = false) ⇒ Object Also known as: shift, deq
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 189 def pop pool, non_block = false @mutex.synchronize do myqueue = (@queues[pool] ||= []) @waitings[pool] ||= [] loop do if myqueue.empty? #puts "JobQueue.pop(#{pool}): empty" raise ThreadError, "queue empty" if non_block @waitings[pool].push Thread.current @mutex.sleep else return myqueue.pop end end end end |
#push(job) ⇒ Object Also known as: <<, enq, requeue
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 150 def push job # Check that item responds to "priorty" method raise "JobQueue.push: job should respond to: priority" unless job.respond_to? :priority raise "JobQueue.push: job should respond to: id" unless job.respond_to? :id raise "JobQueue.push: job should respond to: pool" unless job.respond_to? :pool raise "JobQueue.push: job should respond to: reset" unless job.respond_to? :reset @mutex.synchronize do # Get this job's pool & prepare queue of this pool pool = job.pool myqueue = (@queues[pool] ||= []) # Store the job into the global jobs list, if not already inside @jobs.push(job) unless @jobs.include?(job) # Push job into the queue, if not already inside myqueue.push(job) unless myqueue.include?(job) # Inform the job that it's been queued / reset it job.reset # Refresh queue order #sort_queue!(pool) myqueue.sort_by!(&:weight) # Try to wake a worker up begin @waitings[pool] ||= [] t = @waitings[pool].shift t.wakeup if t rescue ThreadError retry end end end |
#queued_by_pool ⇒ Object
78 79 80 81 82 83 84 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 78 def queued_by_pool result = {} @queues.each do |pool, jobs| result[pool] = jobs.count end result end |
#rate_by(method_name) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/rest-ftp-daemon/job_queue.rb', line 86 def rate_by method_name # Init result = {} #return unless Job.new(0, {}).respond_to? method_name # Select only running jobs @jobs.each do |job| # Compute jobs's group, next if empty group = job.send(method_name) next if group.nil? # Initialize rate entry result[group] ||= nil # If job is not uploading, next ! next unless job.status == JOB_STATUS_UPLOADING # Extract current rate, next if not available rate = job.get_info INFO_BITRATE next if rate.nil? # Add its current rate result[group] ||= 0 result[group] += rate end # Return the rate result end |