Class: Qu::Backend::Mongo
- Inherits:
-
Base
- Object
- Base
- Qu::Backend::Mongo
- Defined in:
- lib/qu/backend/mongo.rb
Instance Attribute Summary collapse
-
#max_retries ⇒ Object
Number of times to retry connection on connection failure (default: 5).
-
#retry_frequency ⇒ Object
Seconds to wait before try to reconnect after connection failure (default: 1).
Instance Method Summary collapse
- #clear(queue = nil) ⇒ Object
- #clear_workers ⇒ Object
- #completed(payload) ⇒ Object
- #connection ⇒ Object (also: #database)
- #enqueue(payload) ⇒ Object
- #failed(payload, error) ⇒ Object
-
#initialize ⇒ Mongo
constructor
A new instance of Mongo.
- #length(queue = 'default') ⇒ Object
- #queues ⇒ Object
- #register_worker(worker) ⇒ Object
- #release(payload) ⇒ Object
- #requeue(id) ⇒ Object
- #reserve(worker, options = {:block => true}) ⇒ Object
- #unregister_worker(worker) ⇒ Object
- #workers ⇒ Object
Constructor Details
#initialize ⇒ Mongo
Returns a new instance of Mongo.
13 14 15 16 |
# File 'lib/qu/backend/mongo.rb', line 13 def initialize self.max_retries = 5 self.retry_frequency = 1 end |
Instance Attribute Details
#max_retries ⇒ Object
Number of times to retry connection on connection failure (default: 5)
8 9 10 |
# File 'lib/qu/backend/mongo.rb', line 8 def max_retries @max_retries end |
#retry_frequency ⇒ Object
Seconds to wait before try to reconnect after connection failure (default: 1)
11 12 13 |
# File 'lib/qu/backend/mongo.rb', line 11 def retry_frequency @retry_frequency end |
Instance Method Details
#clear(queue = nil) ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/qu/backend/mongo.rb', line 35 def clear(queue = nil) queue ||= queues + ['failed'] logger.info { "Clearing queues: #{queue.inspect}" } Array(queue).each do |q| logger.debug "Clearing queue #{q}" jobs(q).drop self[:queues].remove({:name => q}) end end |
#clear_workers ⇒ Object
121 122 123 124 |
# File 'lib/qu/backend/mongo.rb', line 121 def clear_workers logger.info "Clearing workers" self[:workers].drop end |
#completed(payload) ⇒ Object
92 93 |
# File 'lib/qu/backend/mongo.rb', line 92 def completed(payload) end |
#connection ⇒ Object Also known as: database
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/qu/backend/mongo.rb', line 18 def connection @connection ||= begin uri = URI.parse(ENV['MONGOHQ_URL'].to_s) database = uri.path.empty? ? 'qu' : uri.path[1..-1] = {} if uri.password [:auths] = [{ 'db_name' => database, 'username' => uri.user, 'password' => uri.password }] end ::Mongo::Connection.new(uri.host, uri.port, ).db(database) end end |
#enqueue(payload) ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/qu/backend/mongo.rb', line 53 def enqueue(payload) payload.id = BSON::ObjectId.new jobs(payload.queue).insert({:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args}) self[:queues].update({:name => payload.queue}, {:name => payload.queue}, :upsert => true) logger.debug { "Enqueued job #{payload}" } payload end |
#failed(payload, error) ⇒ Object
88 89 90 |
# File 'lib/qu/backend/mongo.rb', line 88 def failed(payload, error) jobs('failed').insert(:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args, :queue => payload.queue) end |
#length(queue = 'default') ⇒ Object
49 50 51 |
# File 'lib/qu/backend/mongo.rb', line 49 def length(queue = 'default') jobs(queue).count end |
#queues ⇒ Object
45 46 47 |
# File 'lib/qu/backend/mongo.rb', line 45 def queues self[:queues].find.map {|doc| doc['name'] } end |
#register_worker(worker) ⇒ Object
105 106 107 108 |
# File 'lib/qu/backend/mongo.rb', line 105 def register_worker(worker) logger.debug "Registering worker #{worker.id}" self[:workers].insert(worker.attributes.merge(:id => worker.id)) end |
#release(payload) ⇒ Object
84 85 86 |
# File 'lib/qu/backend/mongo.rb', line 84 def release(payload) jobs(payload.queue).insert({:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args}) end |
#requeue(id) ⇒ Object
95 96 97 98 99 100 101 102 103 |
# File 'lib/qu/backend/mongo.rb', line 95 def requeue(id) logger.debug "Requeuing job #{id}" doc = jobs('failed').find_and_modify(:query => {:_id => id}, :remove => true) || raise(::Mongo::OperationFailure) jobs(doc.delete('queue')).insert(doc) doc['id'] = doc.delete('_id') Payload.new(doc) rescue ::Mongo::OperationFailure false end |
#reserve(worker, options = {:block => true}) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/qu/backend/mongo.rb', line 61 def reserve(worker, = {:block => true}) loop do worker.queues.each do |queue| logger.debug { "Reserving job in queue #{queue}" } begin if doc = jobs(queue).find_and_modify(:remove => true) doc['id'] = doc.delete('_id') return Payload.new(doc) end rescue ::Mongo::OperationFailure # No jobs in the queue (MongoDB <2) end end if [:block] sleep 5 else break end end end |
#unregister_worker(worker) ⇒ Object
110 111 112 113 |
# File 'lib/qu/backend/mongo.rb', line 110 def unregister_worker(worker) logger.debug "Unregistering worker #{worker.id}" self[:workers].remove(:id => worker.id) end |
#workers ⇒ Object
115 116 117 118 119 |
# File 'lib/qu/backend/mongo.rb', line 115 def workers self[:workers].find.map do |doc| Qu::Worker.new(doc) end end |