Class: RestFtpDaemon::JobQueue

Inherits:
Object
  • Object
show all
Includes:
BmcDaemonLib::LoggerHelper, NewRelic::Agent::Instrumentation::ControllerInstrumentation, CommonHelpers, Singleton
Defined in:
lib/rest-ftp-daemon/job_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from CommonHelpers

#dashboard_url, #exception_to_error, #format_bytes, #identifier, #underscore

Constructor Details

#initializeJobQueue

Returns a new instance of JobQueue.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rest-ftp-daemon/job_queue.rb', line 14

def initialize
  # Initialize values
  @queues = {}
  @waitings = {}
  @jobs = []
  @last_id = 0

  @queues.taint          # enable tainted communication
  @waitings.taint
  taint

  # Create mutex
  @mutex = Mutex.new

  # Logger
  log_pipe :queue

  # Identifiers generator
  @prefix = identifier(JOB_IDENT_LEN)
  log_info "initialized (prefix: #{@prefix})"
end

Instance Attribute Details

#jobsObject (readonly)

Class options



12
13
14
# File 'lib/rest-ftp-daemon/job_queue.rb', line 12

def jobs
  @jobs
end

Instance Method Details

#clearObject



205
206
207
# File 'lib/rest-ftp-daemon/job_queue.rb', line 205

def clear
  @queue.clear
end

#create_job(params) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rest-ftp-daemon/job_queue.rb', line 36

def create_job params
  # Build class name and chock if it exists
  # klass = Kernel.const_get("Job#{params[:type].to_s.capitalize}") rescue nil
  klass_name = "Job#{params[:type].to_s.capitalize}"
  klass = Kernel.const_get(klass_name) rescue nil

  # If object not found, don't create a job !
  unless klass && klass < Job
    message = "can't create [#{klass_name}] for type [#{params[:type]}]"
    log_error "create_job: #{message}"
    raise QueueCantCreateJob, message
  end

  # Generate an ID and stack it
  @mutex.synchronize do
    @last_id += 1
  end
  job_id = prefixed_id(@last_id)

  # Instantiate it and return the now object
  log_info "create_job: creating [#{klass.name}] with ID [#{job_id}]"
  job = klass.new(job_id, params)

  # Push it on the queue
  push job

  return job
end

#empty?Boolean

def jobs_ids

@jobs.collect(&:id)

end

Returns:

  • (Boolean)


130
131
132
# File 'lib/rest-ftp-daemon/job_queue.rb', line 130

def empty?
  @queue.empty?
end

#expire(status, maxage, verbose = false) ⇒ Object

Jobs cleanup



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/rest-ftp-daemon/job_queue.rb', line 223

def expire status, maxage, verbose = false
# FIXME: clean both @jobs and @queue
  # Init
  return if status.nil? || maxage <= 0

  # Compute oldest limit
  time_limit = Time.now - maxage.to_i
  log_info "expire limit [#{time_limit}] status [#{status}]" if verbose

  @mutex.synchronize do
    # Delete jobs from the queue when they match status and age limits
    @jobs.delete_if do |job|
      # log_debug "testing job [#{job.id}] updated_at [#{job.updated_at}]"

      # Skip if wrong status, updated_at invalid, or updated since time_limit
      next unless job.status == status
      next if job.updated_at.nil?
      next if job.updated_at >= time_limit

      # Ok, we have to clean it up ..
      log_info "expire [#{status}]: job [#{job.id}] updated_at [#{job.updated_at}]"

      # From any queues, remove it
      @queues.each do |pool, jobs|
        log_debug "#{LOG_INDENT}unqueued from [#{pool}]" if jobs.delete(job)
      end

      # Remember we have to delete the original job !
      true
    end
  end

end

#find_by_id(id, prefixed = false) ⇒ Object

Queue access



139
140
141
142
143
144
145
146
# File 'lib/rest-ftp-daemon/job_queue.rb', line 139

def find_by_id id, prefixed = false
  # Build a prefixed id if expected
  id = prefixed_id(id) if prefixed
  log_info "find_by_id (#{id}, #{prefixed}) > #{id}"

  # Search in jobs queues
  @jobs.find { |item| item.id == id }
end

#generate_idObject



65
66
67
68
69
70
# File 'lib/rest-ftp-daemon/job_queue.rb', line 65

def generate_id
  @mutex.synchronize do
    @last_id += 1
  end
  prefixed_id @last_id
end

#jobs_by_statusObject



120
121
122
123
124
# File 'lib/rest-ftp-daemon/job_queue.rb', line 120

def jobs_by_status
  statuses = {}
  @jobs.group_by { |job| job.status }.map { |status, jobs| statuses[status] = jobs.size }
  statuses
end

#jobs_countObject

Queue infos



116
117
118
# File 'lib/rest-ftp-daemon/job_queue.rb', line 116

def jobs_count
  @jobs.length
end

#jobs_queuedObject



72
73
74
# File 'lib/rest-ftp-daemon/job_queue.rb', line 72

def jobs_queued
  @queues
end

#jobs_with_status(status) ⇒ Object

Jobs acess and searching



210
211
212
213
214
215
216
217
218
219
220
# File 'lib/rest-ftp-daemon/job_queue.rb', line 210

def jobs_with_status status
  # No status filter: return all execept queued
  if status.empty?
    @jobs.reject { |job| job.status == JOB_STATUS_QUEUED }

  # Status filtering: only those jobs
  else
    @jobs.select { |job| job.status == status.to_s }

  end
end

#pop(pool, non_block = false) ⇒ Object Also known as: shift, deq



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/rest-ftp-daemon/job_queue.rb', line 187

def pop pool, non_block = false
  @mutex.synchronize do
    myqueue = (@queues[pool] ||= [])
    @waitings[pool] ||= []
    loop do
      if myqueue.empty?
        raise ThreadError, "queue empty" if non_block
        @waitings[pool].push Thread.current
        @mutex.sleep
      else
        return myqueue.pop
      end
    end
  end
end

#push(job) ⇒ Object Also known as: <<, enq, requeue



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/rest-ftp-daemon/job_queue.rb', line 148

def push job
  # Check that item responds to "priorty" method
  raise "push: job should respond to: priority" unless job.respond_to? :priority
  raise "push: job should respond to: id" unless job.respond_to? :id
  raise "push: job should respond to: pool" unless job.respond_to? :pool
  raise "push: job should respond to: reset" unless job.respond_to? :reset

  @mutex.synchronize do
    # Get this job's pool & prepare queue of this pool
    pool = job.pool
    myqueue = (@queues[pool] ||= [])

    # Store the job into the global jobs list, if not already inside
    @jobs.push(job) unless @jobs.include?(job)

    # Push job into the queue, if not already inside
    myqueue.push(job) unless myqueue.include?(job)

    # Inform the job that it's been queued / reset it
    job.reset

    # Refresh queue order
    #sort_queue!(pool)
    myqueue.sort_by!(&:weight)

    # Try to wake a worker up
    begin
      @waitings[pool] ||= []
      t = @waitings[pool].shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  end
end

#queued_by_poolObject



76
77
78
79
80
81
82
# File 'lib/rest-ftp-daemon/job_queue.rb', line 76

def queued_by_pool
  result = {}
  @queues.each do |pool, jobs|
    result[pool] = jobs.count
  end
  result
end

#rate_by(method_name) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/rest-ftp-daemon/job_queue.rb', line 84

def rate_by method_name
  # Init
  result = {}
  #return unless Job.new(0, {}).respond_to? method_name

  # Select only running jobs
  @jobs.each do |job|

    # Compute jobs's group, next if empty
    group = job.send(method_name)
    next if group.nil?

    # Initialize rate entry
    result[group] ||= nil

    # If job is not uploading, next !
    next unless job.status == JOB_STATUS_UPLOADING

    # Extract current rate, next if not available
    rate = job.get_info INFO_TRANFER_BITRATE
    next if rate.nil?

    # Add its current rate
    result[group] ||= 0
    result[group] += rate
  end

  # Return the rate
  result
end