Class: Conveyor::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/conveyor/queue.rb

Instance Method Summary collapse

Constructor Details

#initializeQueue

Returns a new instance of Queue.



3
4
5
6
# File 'lib/conveyor/queue.rb', line 3

def initialize
  @mutex = Mutex.new
  @queue = []
end

Instance Method Details

#countObject



8
9
10
# File 'lib/conveyor/queue.rb', line 8

def count
  @queue.count
end

#find(file) ⇒ Object



53
54
55
56
# File 'lib/conveyor/queue.rb', line 53

def find(file)
  i = find_index(file)
  @queue[i] unless i.nil?
end

#find_index(file = nil) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/conveyor/queue.rb', line 45

def find_index(file = nil)
  if file.nil?
    @queue.empty? ? nil : 0
  else
    @queue.find_index { |j| j[:file] == file }
  end
end

#pop(file = nil) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/conveyor/queue.rb', line 67

def pop(file = nil)
  if file.nil?
    @mutex.synchronize do
      @queue.shift
    end
  else
    @mutex.synchronize do
      j = find(file)
      @queue.delete(j)
    end
  end
end

#push(file) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/conveyor/queue.rb', line 16

def push(file)
  @mutex.synchronize do
    i = @queue.find_index { |j| j[:file] == file }
    if i.nil?
      @queue.push({ :file => file, :updated_at => Time.now })
    else
      @queue[i][:updated_at] = Time.now
    end
    @queue.sort! { |x,y| x[:updated_at] <=> y[:updated_at] }
  end
end

#reserve(file = nil) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/conveyor/queue.rb', line 28

def reserve(file = nil)
  @mutex.synchronize do
    # find first non-reserved job
    i = @queue.find_index { |j| !j[:reserved] }
    @queue[i][:reserved] = true unless i.nil?
    @queue[i] unless i.nil?
  end
end

#touch(file) ⇒ Object



12
13
14
# File 'lib/conveyor/queue.rb', line 12

def touch(file)
  push(file)
end

#unpop(job) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/conveyor/queue.rb', line 58

def unpop(job)
  @mutex.synchronize do
    unless find(job[:file])
      @queue.push(job)
      @queue.sort! { |x,y| x[:updated_at] <=> y[:updated_at] }
    end
  end
end

#unreserve(file) ⇒ Object



37
38
39
40
41
42
43
# File 'lib/conveyor/queue.rb', line 37

def unreserve(file)
  @mutex.synchronize do
    i = find_index(file)
    @queue[i][:reserved] = false unless i.nil?
    @queue[i] unless i.nil?
  end
end