Class: Capricorn::JobQueue

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/capricorn/job_queue.rb

Defined Under Namespace

Classes: Job

Instance Method Summary collapse

Constructor Details

#initializeJobQueue

create a new job queue



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/capricorn/job_queue.rb', line 8

def initialize
  @immediated_jobs = Array.new
  @canceled_jobs = Array.new
  @job_queue = Array.new
  @jobs      = Hash.new
  @mutex     = Mutex.new
  @next_id   = 1
  
  @worker    = Thread.new(self) do |job_queue|
    while job_queue.running? or job_queue.peek
      
      if job = job_queue.peek
        job.run(job_queue)
        job_queue.delete(job.id)
      else
        sleep(1)
      end
      
    end
  end
end

Instance Method Details

#cancel(id) ⇒ Object

cancel the job associated with the given id.



75
76
77
78
79
80
81
82
83
# File 'lib/capricorn/job_queue.rb', line 75

def cancel(id)
  @mutex.synchronize do
    id = @job_queue.delete(id)
    if id
      @jobs.delete(id)
      @canceled_jobs.push(id)
    end
  end
end

#canceled?(id) ⇒ Boolean

is the job associated with the given id canceled.

Returns:

  • (Boolean)


86
87
88
89
90
# File 'lib/capricorn/job_queue.rb', line 86

def canceled?(id)
  @mutex.synchronize do
    return !@canceled_jobs.delete(id).nil?
  end
end

#delete(id) ⇒ Object

delete the job associated with the given id.



50
51
52
53
54
55
# File 'lib/capricorn/job_queue.rb', line 50

def delete(id)
  @mutex.synchronize do
    id = @job_queue.delete(id)
    return @jobs.delete(id) if id
  end
end

#dequeueObject

dequeue the next job of the queue.



42
43
44
45
46
47
# File 'lib/capricorn/job_queue.rb', line 42

def dequeue
  @mutex.synchronize do
    id = @job_queue.shift
    return @jobs.delete(id) if id
  end
end

#eachObject

iterate through all the jobs on the queue



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/capricorn/job_queue.rb', line 132

def each
  @mutex.synchronize do
    @job_queue.each do |id|
      job        = @jobs[id]
      canceled   = @canceled_jobs.include?(id)
      immediated = @immediated_jobs.include?(id)
      
      yield(job, canceled, immediated)
    end
  end
end

#enqueue(name, options = {}, &proc) ⇒ Object

enqueue a new job with the given name, options and proc



31
32
33
34
35
36
37
38
39
# File 'lib/capricorn/job_queue.rb', line 31

def enqueue(name, options={}, &proc)
  @mutex.synchronize do
    job = Job.new(@next_id, name, options, &proc)
    @next_id += 1
    @jobs[job.id] = job
    @job_queue.push job.id
    return job.id
  end
end

#immediate(id) ⇒ Object

run the job associated with the given id immediately.



93
94
95
96
97
# File 'lib/capricorn/job_queue.rb', line 93

def immediate(id)
  @mutex.synchronize do
    @immediated_jobs.push(id) if @jobs[id]
  end
end

#immediated?(id) ⇒ Boolean

should the job associated with the given id be run immediately.

Returns:

  • (Boolean)


100
101
102
103
104
# File 'lib/capricorn/job_queue.rb', line 100

def immediated?(id)
  @mutex.synchronize do
    return !@immediated_jobs.delete(id).nil?
  end
end

#join!Object

join the worker thread



107
108
109
# File 'lib/capricorn/job_queue.rb', line 107

def join!
  @worker.join
end

#peekObject

peek at the next job in the queue



58
59
60
61
62
63
64
65
# File 'lib/capricorn/job_queue.rb', line 58

def peek
  job = nil
  @mutex.synchronize do
    id = @job_queue.first
    job = @jobs[id] if id
  end
  job
end

#running?Boolean

is the queue running

Returns:

  • (Boolean)


127
128
129
# File 'lib/capricorn/job_queue.rb', line 127

def running?
  !stopped?
end

#sizeObject

get the size of the job queue



68
69
70
71
72
# File 'lib/capricorn/job_queue.rb', line 68

def size
  @mutex.synchronize do
    @job_queue.size
  end
end

#stop!Object

wait until the queue is empty then stop the worker



112
113
114
115
116
117
# File 'lib/capricorn/job_queue.rb', line 112

def stop!
  @mutex.synchronize do
    @stopped = true
  end
  join!
end

#stopped?Boolean

is the queue stopping or stopped?

Returns:

  • (Boolean)


120
121
122
123
124
# File 'lib/capricorn/job_queue.rb', line 120

def stopped?
  @mutex.synchronize do
    return !!@stopped
  end
end