Class: Capricorn::JobQueue
- Inherits:
-
Object
- Object
- Capricorn::JobQueue
- Includes:
- DRbUndumped
- Defined in:
- lib/capricorn/job_queue.rb
Defined Under Namespace
Classes: Job
Instance Method Summary collapse
-
#cancel(id) ⇒ Object
cancel the job associated with the given
id
. -
#canceled?(id) ⇒ Boolean
is the job associated with the given
id
canceled. -
#delete(id) ⇒ Object
delete the job associated with the given
id
. -
#dequeue ⇒ Object
dequeue the next job of the queue.
-
#each ⇒ Object
iterate through all the jobs on the queue.
-
#enqueue(name, options = {}, &proc) ⇒ Object
enqueue a new job with the given
name
,options
andproc
. -
#immediate(id) ⇒ Object
run the job associated with the given
id
immediately. -
#immediated?(id) ⇒ Boolean
should the job associated with the given
id
be run immediately. -
#initialize ⇒ JobQueue
constructor
create a new job queue.
-
#join! ⇒ Object
join the worker thread.
-
#peek ⇒ Object
peek at the next job in the queue.
-
#running? ⇒ Boolean
is the queue running.
-
#size ⇒ Object
get the size of the job queue.
-
#stop! ⇒ Object
wait until the queue is empty then stop the worker.
-
#stopped? ⇒ Boolean
is the queue stopping or stopped?.
Constructor Details
#initialize ⇒ JobQueue
create a new job queue
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/capricorn/job_queue.rb', line 8 def initialize @immediated_jobs = Array.new @canceled_jobs = Array.new @job_queue = Array.new @jobs = Hash.new @mutex = Mutex.new @next_id = 1 @worker = Thread.new(self) do |job_queue| while job_queue.running? or job_queue.peek if job = job_queue.peek job.run(job_queue) job_queue.delete(job.id) else sleep(1) end end end end |
Instance Method Details
#cancel(id) ⇒ Object
cancel the job associated with the given id
.
75 76 77 78 79 80 81 82 83 |
# File 'lib/capricorn/job_queue.rb', line 75 def cancel(id) @mutex.synchronize do id = @job_queue.delete(id) if id @jobs.delete(id) @canceled_jobs.push(id) end end end |
#canceled?(id) ⇒ Boolean
is the job associated with the given id
canceled.
86 87 88 89 90 |
# File 'lib/capricorn/job_queue.rb', line 86 def canceled?(id) @mutex.synchronize do return !@canceled_jobs.delete(id).nil? end end |
#delete(id) ⇒ Object
delete the job associated with the given id
.
50 51 52 53 54 55 |
# File 'lib/capricorn/job_queue.rb', line 50 def delete(id) @mutex.synchronize do id = @job_queue.delete(id) return @jobs.delete(id) if id end end |
#dequeue ⇒ Object
dequeue the next job of the queue.
42 43 44 45 46 47 |
# File 'lib/capricorn/job_queue.rb', line 42 def dequeue @mutex.synchronize do id = @job_queue.shift return @jobs.delete(id) if id end end |
#each ⇒ Object
iterate through all the jobs on the queue
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/capricorn/job_queue.rb', line 132 def each @mutex.synchronize do @job_queue.each do |id| job = @jobs[id] canceled = @canceled_jobs.include?(id) immediated = @immediated_jobs.include?(id) yield(job, canceled, immediated) end end end |
#enqueue(name, options = {}, &proc) ⇒ Object
enqueue a new job with the given name
, options
and proc
31 32 33 34 35 36 37 38 39 |
# File 'lib/capricorn/job_queue.rb', line 31 def enqueue(name, ={}, &proc) @mutex.synchronize do job = Job.new(@next_id, name, , &proc) @next_id += 1 @jobs[job.id] = job @job_queue.push job.id return job.id end end |
#immediate(id) ⇒ Object
run the job associated with the given id
immediately.
93 94 95 96 97 |
# File 'lib/capricorn/job_queue.rb', line 93 def immediate(id) @mutex.synchronize do @immediated_jobs.push(id) if @jobs[id] end end |
#immediated?(id) ⇒ Boolean
should the job associated with the given id
be run immediately.
100 101 102 103 104 |
# File 'lib/capricorn/job_queue.rb', line 100 def immediated?(id) @mutex.synchronize do return !@immediated_jobs.delete(id).nil? end end |
#join! ⇒ Object
join the worker thread
107 108 109 |
# File 'lib/capricorn/job_queue.rb', line 107 def join! @worker.join end |
#peek ⇒ Object
peek at the next job in the queue
58 59 60 61 62 63 64 65 |
# File 'lib/capricorn/job_queue.rb', line 58 def peek job = nil @mutex.synchronize do id = @job_queue.first job = @jobs[id] if id end job end |
#running? ⇒ Boolean
is the queue running
127 128 129 |
# File 'lib/capricorn/job_queue.rb', line 127 def running? !stopped? end |
#size ⇒ Object
get the size of the job queue
68 69 70 71 72 |
# File 'lib/capricorn/job_queue.rb', line 68 def size @mutex.synchronize do @job_queue.size end end |
#stop! ⇒ Object
wait until the queue is empty then stop the worker
112 113 114 115 116 117 |
# File 'lib/capricorn/job_queue.rb', line 112 def stop! @mutex.synchronize do @stopped = true end join! end |
#stopped? ⇒ Boolean
is the queue stopping or stopped?
120 121 122 123 124 |
# File 'lib/capricorn/job_queue.rb', line 120 def stopped? @mutex.synchronize do return !!@stopped end end |