Class: RestFtpDaemon::JobQueue

Inherits:
Object
  • Object
show all
Includes:
NewRelic::Agent::Instrumentation::ControllerInstrumentation, LoggerHelper
Defined in:
lib/rest-ftp-daemon/job_queue.rb

Overview

Queue that stores all the Jobs waiting to be processed or fully processed

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJobQueue

Returns a new instance of JobQueue.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rest-ftp-daemon/job_queue.rb', line 15

def initialize
  # Instance variables
  @queue = []
  @jobs = []
  @waiting = []
  @queue.taint          # enable tainted communication
  @waiting.taint
  taint
  @mutex = Mutex.new

  # Logger
  @logger = RestFtpDaemon::LoggerPool.instance.get :queue

  # Identifiers generator
  @last_id = 0
  @prefix = Helpers.identifier JOB_IDENT_LEN
  log_info "JobQueue initialized (prefix: #{@prefix})"

  # Mutex for counters
  @counters = {}
  @mutex_counters = Mutex.new
end

Instance Attribute Details

#jobsObject (readonly)

Returns the value of attribute jobs.



9
10
11
# File 'lib/rest-ftp-daemon/job_queue.rb', line 9

def jobs
  @jobs
end

#loggerObject (readonly)

Returns the value of attribute logger.



6
7
8
# File 'lib/rest-ftp-daemon/job_queue.rb', line 6

def logger
  @logger
end

#queueObject (readonly)

Returns the value of attribute queue.



8
9
10
# File 'lib/rest-ftp-daemon/job_queue.rb', line 8

def queue
  @queue
end

Instance Method Details

#clearObject



161
162
163
# File 'lib/rest-ftp-daemon/job_queue.rb', line 161

def clear
  @queue.clear
end

#counter_add(name, value) ⇒ Object



45
46
47
48
49
50
# File 'lib/rest-ftp-daemon/job_queue.rb', line 45

def counter_add name, value
  @mutex_counters.synchronize do
    @counters[name] ||= 0
    @counters[name] += value
  end
end

#counter_get(name) ⇒ Object



56
57
58
59
60
# File 'lib/rest-ftp-daemon/job_queue.rb', line 56

def counter_get name
  @mutex_counters.synchronize do
    @counters[name]
  end
end

#counter_inc(name) ⇒ Object



52
53
54
# File 'lib/rest-ftp-daemon/job_queue.rb', line 52

def counter_inc name
  counter_add name, 1
end

#countersObject



62
63
64
65
66
# File 'lib/rest-ftp-daemon/job_queue.rb', line 62

def counters
  @mutex_counters.synchronize do
    @counters
  end
end

#counts_by_statusObject



80
81
82
83
84
# File 'lib/rest-ftp-daemon/job_queue.rb', line 80

def counts_by_status
  statuses = {}
  @jobs.group_by { |job| job.status }.map { |status, jobs| statuses[status] = jobs.size }
  statuses
end

#empty?Boolean

Returns:

  • (Boolean)


157
158
159
# File 'lib/rest-ftp-daemon/job_queue.rb', line 157

def empty?
  @queue.empty?
end

#expire(status, maxage, verbose = false) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/rest-ftp-daemon/job_queue.rb', line 169

def expire status, maxage, verbose = false
# FIXME: clean both @jobs and @queue
  # Init
  return if status.nil? || maxage <= 0

  # Compute oldest possible birthday
  before = Time.now - maxage.to_i

  # Verbose output ?
  log_info "JobQueue.expire \t[#{status}] \tbefore \t[#{before}]" if verbose

  @mutex.synchronize do
    # Delete jobs from the queue when they match status and age limits
    @jobs.delete_if do |job|

      # Skip if wrong status, updated_at invalid, or too young
      next unless job.status == status.to_sym
      next if job.updated_at.nil?
      next if job.updated_at > before

      # Ok, we have to clean it up ..
      log_info "expire [#{status}] [#{maxage}] > [#{job.id}] [#{job.updated_at}]"
      log_info "#{LOG_INDENT}unqueued" if @queue.delete(job)

      true
    end
  end

end

#filter_jobs(status) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
# File 'lib/rest-ftp-daemon/job_queue.rb', line 68

def filter_jobs status
  # No status filter: return all execept queued
  if status.empty?
    @jobs.reject { |job| job.status == JOB_STATUS_QUEUED }

  # Status filtering: only those jobs
  else
    @jobs.select { |job| job.status == status.to_sym }

  end
end

#find_by_id(id, prefixed = false) ⇒ Object



98
99
100
101
102
103
104
105
# File 'lib/rest-ftp-daemon/job_queue.rb', line 98

def find_by_id id, prefixed = false
  # Build a prefixed id if expected
  id = prefixed_id(id) if prefixed
  log_info "find_by_id (#{id}, #{prefixed}) > #{id}"

  # Search in jobs queues
  @jobs.reverse.find { |item| item.id == id }
end

#generate_idObject



38
39
40
41
42
43
# File 'lib/rest-ftp-daemon/job_queue.rb', line 38

def generate_id
  @mutex.synchronize do
    @last_id += 1
  end
  prefixed_id @last_id
end

#jobs_countObject



86
87
88
# File 'lib/rest-ftp-daemon/job_queue.rb', line 86

def jobs_count
  @jobs.length
end

#jobs_idsObject



94
95
96
# File 'lib/rest-ftp-daemon/job_queue.rb', line 94

def jobs_ids
  @jobs.collect(&:id)
end

#num_waitingObject



165
166
167
# File 'lib/rest-ftp-daemon/job_queue.rb', line 165

def num_waiting
  @waiting.size
end

#pop(non_block = false) ⇒ Object Also known as: shift, deq



140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/rest-ftp-daemon/job_queue.rb', line 140

def pop non_block = false
  @mutex.synchronize do
    loop do
      if @queue.empty?
        # info "JobQueue.pop: empty"
        raise ThreadError, "queue empty" if non_block
        @waiting.push Thread.current
        @mutex.sleep
      else
        return @queue.pop
      end
    end
  end
end

#push(job) ⇒ Object Also known as: <<, enq, requeue



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/rest-ftp-daemon/job_queue.rb', line 108

def push job
  # Check that item responds to "priorty" method
  raise "JobQueue.push: job should respond to priority method" unless job.respond_to? :priority
  raise "JobQueue.push: job should respond to id method" unless job.respond_to? :id
  raise "JobQueue.push: job should respond to reset" unless job.respond_to? :reset

  @mutex.synchronize do
    # Store the job into the global jobs list, if not already inside
    @jobs.push(job) unless @jobs.include?(job)

    # Push job into the queue, if not already inside
    @queue.push(job) unless @queue.include?(job)

    # Inform the job that it's been queued / reset it
    job.reset

    # Refresh queue order
    sort_queue!

    # Try to wake a worker up
    begin
      t = @waiting.shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  end
end

#queued_idsObject



90
91
92
# File 'lib/rest-ftp-daemon/job_queue.rb', line 90

def queued_ids
  @queue.collect(&:id)
end