Class: Polipus::QueueOverflow::MongoQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/polipus/queue_overflow/mongo_queue.rb

Direct Known Subclasses

MongoQueueCapped

Instance Method Summary collapse

Constructor Details

#initialize(mongo_db, queue_name, options = {}) ⇒ MongoQueue

Returns a new instance of MongoQueue.



5
6
7
8
9
10
11
12
13
14
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 5

def initialize(mongo_db, queue_name, options = {})
  @mongo_db = mongo_db
  @collection_name = "polipus_q_overflow_#{queue_name}"
  @semaphore = Mutex.new
  @options = options
  @options[:ensure_uniq] ||= false
  if @options[:ensure_uniq]
    ensure_index
  end
end

Instance Method Details

#clearObject



24
25
26
27
28
29
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 24

def clear
  @mongo_db[@collection_name].drop
  if @options[:ensure_uniq]
    ensure_index
  end
end

#empty?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 20

def empty?
  !(length > 0)
end

#lengthObject Also known as: size



16
17
18
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 16

def length
  @mongo_db[@collection_name].count
end

#pop(_ = false) ⇒ Object Also known as: dec, shift



40
41
42
43
44
45
46
47
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 40

def pop(_ = false)
  @semaphore.synchronize {
    doc = @mongo_db[@collection_name].find({},:sort => {:_id => 1}).limit(1).first
    return nil if doc.nil?
    @mongo_db[@collection_name].remove(:_id => doc['_id'])
    doc && doc['payload'] ? doc['payload'] : nil
  }
end

#push(data) ⇒ Object Also known as: enc, <<



31
32
33
34
35
36
37
38
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 31

def push data
  unless @options[:ensure_uniq]
    @mongo_db[@collection_name].insert({:payload => data})  
  else
    @mongo_db[@collection_name].update({:payload => data}, {:payload => data}, {:upsert => 1, :w => 1})
  end
  true        
end