Class: StubbornQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/stubborn_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ StubbornQueue

Returns a new instance of StubbornQueue.



7
8
9
10
11
12
# File 'lib/stubborn_queue.rb', line 7

def initialize(options = {})
  @name = options.fetch :name, 'default'
  @timeout = options.fetch :timeout, 60
  path = options.fetch :file, ".#{@name}.db"
  @db = Moneta.new :Daybreak, expires: true, file: path
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



6
7
8
# File 'lib/stubborn_queue.rb', line 6

def db
  @db
end

Instance Method Details

#claimsObject



42
43
44
# File 'lib/stubborn_queue.rb', line 42

def claims
  @db.fetch key_for(:claimed_list), []
end

#create_idObject



34
35
36
# File 'lib/stubborn_queue.rb', line 34

def create_id
  @db.increment key_for(:id_count)
end

#dequeueObject



66
67
68
69
70
71
# File 'lib/stubborn_queue.rb', line 66

def dequeue
  process_expired_claims
  id = @db.rpoplpush key_for(:pending_list), key_for(:claimed_list)
  @db.store key_for(:claimed_flag, with_id: id), true, expires: @timeout
  id
end

#enqueue(item) ⇒ Object



59
60
61
62
63
64
# File 'lib/stubborn_queue.rb', line 59

def enqueue(item)
  process_expired_claims
  id = create_id
  @db.store key_for(:item_store, with_id: id), item
  @db.lpush key_for(:pending_list), id
end

#finish(id) ⇒ Object



77
78
79
# File 'lib/stubborn_queue.rb', line 77

def finish(id)
  @db.store key_for(:finished_flag, with_id: id), true
end

#finished?(id) ⇒ Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/stubborn_queue.rb', line 50

def finished?(id)
  @db.fetch key_for(:finished_flag, with_id: id), false
end

#key_for(type, with_id: 0) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/stubborn_queue.rb', line 14

def key_for(type, with_id: 0)
  key = case type
  when :pending_list
    "#{@name}:pending"
  when :claimed_list
    "#{@name}:claimed"
  when :claimed_flag
    "#{@name}:task:#{with_id}:claimed"
  when :finished_flag
    "#{@name}:task:#{with_id}:finished"
  when :item_store
    "#{@name}:@{id}"
  when :id_count
    "#{@name}:id_count"
  else
    raise "Key for '#{type}' is unrecognized."
  end
  "#{@name}:#{key}"
end

#lookup(id) ⇒ Object



38
39
40
# File 'lib/stubborn_queue.rb', line 38

def lookup(id)
  @db.fetch key_for(:item_store, with_id: id), nil
end

#process_expired_claimsObject



81
82
83
84
85
86
87
# File 'lib/stubborn_queue.rb', line 81

def process_expired_claims
  claims.each do |id|
    next if recent_claim? id
    remove_claim_on id
    requeue id unless finished? id
  end
end

#recent_claim?(id) ⇒ Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/stubborn_queue.rb', line 46

def recent_claim?(id)
  @db.fetch key_for(:claimed_flag, with_id: id), false
end

#remove_claim_on(id) ⇒ Object



54
55
56
57
# File 'lib/stubborn_queue.rb', line 54

def remove_claim_on(id)
  @db.lrem key_for(:claimed_list), 0, id
  @db.delete key_for(:claimed_flag, with_id: id)
end

#requeue(id) ⇒ Object



73
74
75
# File 'lib/stubborn_queue.rb', line 73

def requeue(id)
  @db.lpush key_for(:pending_list), id
end