Class: Disbatch::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/disbatch/queue.rb

Overview

Represents a Disbatch queue

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plugin, id) ⇒ Queue

Create a queue object

Parameters:

  • plugin (String)
  • id (String)


14
15
16
17
# File 'lib/disbatch/queue.rb', line 14

def initialize(plugin, id)
  @id = id
  @plugin = plugin
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/disbatch/queue.rb', line 6

def id
  @id
end

#pluginObject (readonly)

Returns the value of attribute plugin.



6
7
8
# File 'lib/disbatch/queue.rb', line 6

def plugin
  @plugin
end

Class Method Details

.create(plugin, opts = {}) ⇒ Object

Create a new queue

Parameters:

  • plugin (String)
  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • id (String)
  • maxthreads (Integer)
  • description (String)
  • nodes_pin (Array)
  • nodes_ignore (Array)

Raises:



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/disbatch/queue.rb', line 48

def self.create(plugin, opts={})
  raise Disbatch::NoPluginError unless Disbatch::Plugin[plugin]
  
  id           = opts[:id]           || BSON::ObjectId.new.to_s
  maxthreads   = opts[:maxthreads]   || 10
  description  = opts[:description]  || ''
  nodes_pin    = opts[:nodes_pin]    || []
  nodes_ignore = opts[:nodes_ignore] || []

  doc = Mongo.try do
    Disbatch.db[:queues].insert({
      :_id             => id,
      :class           => plugin,
      :maxthreads      => maxthreads,
      :description     => description,
      :nodes_pin       => nodes_pin,
      :nodes_ignore    => nodes_ignore,
      :ctime           => Time.now
    })
  end

  unless doc.nil?
    new(plugin, id)
  end
end

.get(id) ⇒ Object

Get an existing queue

Parameters:

  • id (String)

Raises:



22
23
24
25
26
27
28
29
30
# File 'lib/disbatch/queue.rb', line 22

def self.get(id)
  doc = Mongo.try do
    Disbatch.db[:queues].find_one({:_id => id})
  end

  raise Disbatch::NoQueueError if doc.nil?

  new(doc['class'], doc['_id'])
end

.get_allObject

Get all existing queues



33
34
35
36
37
# File 'lib/disbatch/queue.rb', line 33

def self.get_all
  Mongo.try do
    Disbatch.db[:queues].find.map { |doc| new(doc['class'], doc['_id']) } 
  end
end

Instance Method Details

#==(queue) ⇒ Object

Check equality with another queue object

@param queue another queue to compare against



105
106
107
# File 'lib/disbatch/queue.rb', line 105

def ==(queue)
  @id == queue.id
end

#lengthObject Also known as: size

Number of pending tasks



75
76
77
# File 'lib/disbatch/queue.rb', line 75

def length
  Disbatch.db[:tasks].find({:queue => @id, :status=> -2}).count
end

#nodes_ignoreObject



96
97
98
99
100
# File 'lib/disbatch/queue.rb', line 96

def nodes_ignore
  doc = Disbatch.db[:queues].find_one({:_id => self.id}, {:fields=> [ :nodes_ignore ] })

  return doc['nodes_ignore'] || []
end

#nodes_pinObject



90
91
92
93
94
# File 'lib/disbatch/queue.rb', line 90

def nodes_pin
  doc = Disbatch.db[:queues].find_one({:_id => self.id}, {:fields=> [ :nodes_pin ] })

  return doc['nodes_pin'] || []
end

#popObject

Pop a task off the queue



86
87
88
# File 'lib/disbatch/queue.rb', line 86

def pop
  Disbatch::Task.take(self)
end

#push(parameters) ⇒ Object

Push a new task onto the queue



80
81
82
83
# File 'lib/disbatch/queue.rb', line 80

def push(parameters)
  Disbatch::Task.create(self, parameters)
  self
end