Module: Monque

Defined in:
lib/monque.rb

Defined Under Namespace

Classes: Worker

Constant Summary collapse

RETRY_DELAY =

wait this number secs before trying a job again

1800
REPLICATION_FACTOR =
1

Class Method Summary collapse

Class Method Details

.enqueue(cls, *args) ⇒ Object

Raises:

  • (NameError)


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/monque.rb', line 57

def self.enqueue(cls, *args)
  queue = queue_for_cls(cls)
  raise NameError.new("Invalid queue name: #{queue.inspect}") unless queue.kind_of?(String)
  
  @jobs ||= jobs_collection
  @jobs.save({
    'queue' => queue,
    'class' => cls.name,
    'data' => args.to_json,
    'started' => 0,
    'created' => Time.now.to_f,
    'proc_attempts' => []},
    {:safe => {:w => REPLICATION_FACTOR}}
  )
end

.jobs_collection(host = nil, port = nil) ⇒ Object



38
39
40
41
42
43
# File 'lib/monque.rb', line 38

def self.jobs_collection(host=nil, port=nil)
  host ||= @mongo_host
  port ||= @mongo_port
  
  Mongo::Connection.new(host, port)['monque']['jobs']
end

.mongo_host=(h) ⇒ Object



35
# File 'lib/monque.rb', line 35

def self.mongo_host=(h); @mongo_host = h; end

.mongo_port=(p) ⇒ Object



36
# File 'lib/monque.rb', line 36

def self.mongo_port=(p); @mongo_port = p; end

.queue_for_cls(cls) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/monque.rb', line 45

def self.queue_for_cls(cls)
  if cls.kind_of?(Class) || cls.kind_of?(Module)
    if qname = cls.instance_variable_get(:@queue)
      qname.is_a?(Symbol) ? qname.to_s : qname
    else
      raise "You should define the @queue name for #{cls.inspect}"
    end
  else
    raise TypeError.new("The first argument to enqueue should be a class")
  end
end