Module: Monque
- Defined in:
- lib/monque.rb
Defined Under Namespace
Classes: Worker
Constant Summary collapse
- RETRY_DELAY =
wait this number secs before trying a job again
1800- REPLICATION_FACTOR =
1
Class Method Summary collapse
- .enqueue(cls, *args) ⇒ Object
- .jobs_collection(host = nil, port = nil) ⇒ Object
- .mongo_host=(h) ⇒ Object
- .mongo_port=(p) ⇒ Object
- .queue_for_cls(cls) ⇒ Object
Class Method Details
.enqueue(cls, *args) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/monque.rb', line 57 def self.enqueue(cls, *args) queue = queue_for_cls(cls) raise NameError.new("Invalid queue name: #{queue.inspect}") unless queue.kind_of?(String) @jobs ||= jobs_collection @jobs.save({ 'queue' => queue, 'class' => cls.name, 'data' => args.to_json, 'started' => 0, 'created' => Time.now.to_f, 'proc_attempts' => []}, {:safe => {:w => REPLICATION_FACTOR}} ) end |
.jobs_collection(host = nil, port = nil) ⇒ Object
38 39 40 41 42 43 |
# File 'lib/monque.rb', line 38 def self.jobs_collection(host=nil, port=nil) host ||= @mongo_host port ||= @mongo_port Mongo::Connection.new(host, port)['monque']['jobs'] end |
.mongo_host=(h) ⇒ Object
35 |
# File 'lib/monque.rb', line 35 def self.mongo_host=(h); @mongo_host = h; end |
.mongo_port=(p) ⇒ Object
36 |
# File 'lib/monque.rb', line 36 def self.mongo_port=(p); @mongo_port = p; end |
.queue_for_cls(cls) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/monque.rb', line 45 def self.queue_for_cls(cls) if cls.kind_of?(Class) || cls.kind_of?(Module) if qname = cls.instance_variable_get(:@queue) qname.is_a?(Symbol) ? qname.to_s : qname else raise "You should define the @queue name for #{cls.inspect}" end else raise TypeError.new("The first argument to enqueue should be a class") end end |