Class: Qu::Backend::Mongo

Inherits:
Base
  • Object
show all
Defined in:
lib/qu/backend/mongo.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMongo

Returns a new instance of Mongo.



16
17
18
19
20
# File 'lib/qu/backend/mongo.rb', line 16

def initialize
  self.max_retries     = 5
  self.retry_frequency = 1
  self.poll_frequency  = 5
end

Instance Attribute Details

#max_retriesObject

Number of times to retry connection on connection failure (default: 5)



8
9
10
# File 'lib/qu/backend/mongo.rb', line 8

def max_retries
  @max_retries
end

#poll_frequencyObject

Seconds to wait before looking for more jobs when the queue is empty (default: 5)



14
15
16
# File 'lib/qu/backend/mongo.rb', line 14

def poll_frequency
  @poll_frequency
end

#retry_frequencyObject

Seconds to wait before try to reconnect after connection failure (default: 1)



11
12
13
# File 'lib/qu/backend/mongo.rb', line 11

def retry_frequency
  @retry_frequency
end

Instance Method Details

#clear(queue = nil) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/qu/backend/mongo.rb', line 39

def clear(queue = nil)
  queue ||= queues + ['failed']
  logger.info { "Clearing queues: #{queue.inspect}" }
  Array(queue).each do |q|
    logger.debug "Clearing queue #{q}"
    jobs(q).drop
    self[:queues].remove({:name => q})
  end
end

#clear_workersObject



125
126
127
128
# File 'lib/qu/backend/mongo.rb', line 125

def clear_workers
  logger.info "Clearing workers"
  self[:workers].drop
end

#completed(payload) ⇒ Object



96
97
# File 'lib/qu/backend/mongo.rb', line 96

def completed(payload)
end

#connectionObject Also known as: database



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/qu/backend/mongo.rb', line 22

def connection
  @connection ||= begin
    uri = URI.parse(ENV['MONGOHQ_URL'].to_s)
    database = uri.path.empty? ? 'qu' : uri.path[1..-1]
    options = {}
    if uri.password
      options[:auths] = [{
        'db_name'  => database,
        'username' => uri.user,
        'password' => uri.password
      }]
    end
    ::Mongo::Connection.new(uri.host, uri.port, options).db(database)
  end
end

#enqueue(payload) ⇒ Object



57
58
59
60
61
62
63
# File 'lib/qu/backend/mongo.rb', line 57

def enqueue(payload)
  payload.id = BSON::ObjectId.new
  jobs(payload.queue).insert({:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args})
  self[:queues].update({:name => payload.queue}, {:name => payload.queue}, :upsert => true)
  logger.debug { "Enqueued job #{payload}" }
  payload
end

#failed(payload, error) ⇒ Object



92
93
94
# File 'lib/qu/backend/mongo.rb', line 92

def failed(payload, error)
  jobs('failed').insert(:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args, :queue => payload.queue)
end

#length(queue = 'default') ⇒ Object



53
54
55
# File 'lib/qu/backend/mongo.rb', line 53

def length(queue = 'default')
  jobs(queue).count
end

#queuesObject



49
50
51
# File 'lib/qu/backend/mongo.rb', line 49

def queues
  self[:queues].find.map {|doc| doc['name'] }
end

#register_worker(worker) ⇒ Object



109
110
111
112
# File 'lib/qu/backend/mongo.rb', line 109

def register_worker(worker)
  logger.debug "Registering worker #{worker.id}"
  self[:workers].insert(worker.attributes.merge(:id => worker.id))
end

#release(payload) ⇒ Object



88
89
90
# File 'lib/qu/backend/mongo.rb', line 88

def release(payload)
  jobs(payload.queue).insert({:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args})
end

#requeue(id) ⇒ Object



99
100
101
102
103
104
105
106
107
# File 'lib/qu/backend/mongo.rb', line 99

def requeue(id)
  logger.debug "Requeuing job #{id}"
  doc = jobs('failed').find_and_modify(:query => {:_id => id}, :remove => true) || raise(::Mongo::OperationFailure)
  jobs(doc.delete('queue')).insert(doc)
  doc['id'] = doc.delete('_id')
  Payload.new(doc)
rescue ::Mongo::OperationFailure
  false
end

#reserve(worker, options = {:block => true}) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/qu/backend/mongo.rb', line 65

def reserve(worker, options = {:block => true})
  loop do
    worker.queues.each do |queue|
      logger.debug { "Reserving job in queue #{queue}" }

      begin
        if doc = jobs(queue).find_and_modify(:remove => true)
          doc['id'] = doc.delete('_id')
          return Payload.new(doc)
        end
      rescue ::Mongo::OperationFailure
        # No jobs in the queue (MongoDB <2)
      end
    end

    if options[:block]
      sleep poll_frequency
    else
      break
    end
  end
end

#unregister_worker(worker) ⇒ Object



114
115
116
117
# File 'lib/qu/backend/mongo.rb', line 114

def unregister_worker(worker)
  logger.debug "Unregistering worker #{worker.id}"
  self[:workers].remove(:id => worker.id)
end

#workersObject



119
120
121
122
123
# File 'lib/qu/backend/mongo.rb', line 119

def workers
  self[:workers].find.map do |doc|
    Qu::Worker.new(doc)
  end
end