Class: Polipus::QueueOverflow::MongoQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/polipus/queue_overflow/mongo_queue.rb

Direct Known Subclasses

MongoQueueCapped

Instance Method Summary collapse

Constructor Details

#initialize(mongo_db, queue_name, options = {}) ⇒ MongoQueue

Returns a new instance of MongoQueue.



7
8
9
10
11
12
13
14
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 7

def initialize(mongo_db, queue_name, options = {})
  @mongo_db = mongo_db
  @collection_name = "polipus_q_overflow_#{queue_name}"
  @semaphore = Mutex.new
  @options = options
  @options[:ensure_uniq] ||= false
  @options[:ensure_uniq] && ensure_index
end

Instance Method Details

#clearObject



24
25
26
27
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 24

def clear
  @mongo_db[@collection_name].drop
  @options[:ensure_uniq] && ensure_index
end

#empty?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 20

def empty?
  !(length > 0)
end

#lengthObject Also known as: size



16
17
18
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 16

def length
  @mongo_db[@collection_name].find.count
end

#pop(_ = false) ⇒ Object Also known as: dec, shift



38
39
40
41
42
43
44
45
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 38

def pop(_ = false)
  @semaphore.synchronize do
    doc = @mongo_db[@collection_name].find.sort(_id: 1).limit(1).first
    return nil if doc.nil?
    @mongo_db[@collection_name].find(_id: doc['_id']).delete_one
    doc && doc['payload'] ? doc['payload'] : nil
  end
end

#push(data) ⇒ Object Also known as: enc, <<



29
30
31
32
33
34
35
36
# File 'lib/polipus/queue_overflow/mongo_queue.rb', line 29

def push(data)
  if @options[:ensure_uniq]
    @mongo_db[@collection_name].find(payload: data).replace_one({ payload: data }, upsert: true)
  else
    @mongo_db[@collection_name].insert_one(payload: data)
  end
  true
end