Class: Rplex::Overseer

Inherits:
Object
  • Object
show all
Defined in:
lib/rplex/jobs.rb

Overview

Simple queue management for Rplex job data

Instance Method Summary collapse

Constructor Details

#initializeOverseer

Returns a new instance of Overseer.



10
11
12
# File 'lib/rplex/jobs.rb', line 10

def initialize
  @queues={}
end

Instance Method Details

#<<(job_data) ⇒ Object

Add a job for all workers currently active



14
15
16
# File 'lib/rplex/jobs.rb', line 14

def << job_data
  add_job(job_data)
end

#[](worker) ⇒ Object

Get the next job for the worker

If there is no Queue for the worker, create an empty one



43
44
45
46
# File 'lib/rplex/jobs.rb', line 43

def [](worker)
  @queues[worker]||=Queue.new
  @queues[worker].pop(true) rescue nil
end

#add_job(job_data, workers = []) ⇒ Object

Add a job.

You can limit the workers it is distributed to by providing an Array with the worker identifiers



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rplex/jobs.rb', line 21

def add_job job_data,workers=[]
  queued_in=0
  workers=@queues.keys if workers.empty?
  if valid?(job_data)
    @queues.each do |w,q|
      if workers.include?(w)
        #this handles a SizedQueue without blocking
        if q.respond_to?(:max) && q.size == q.max
          q.pop
        end
        q.push(job_data)
        queued_in+=1  
      end
    end
  else
    raise InvalidData
  end
  return queued_in
end

#backlogObject

Get an array of [name,queue size]



48
49
50
# File 'lib/rplex/jobs.rb', line 48

def backlog
  @queues.map{|k,v| [k,v.size]} 
end

#configuration(worker) ⇒ Object

Returns the worker’s configuration



82
83
84
85
86
87
88
89
# File 'lib/rplex/jobs.rb', line 82

def configuration worker
  if @queues[worker]  
    @queues[worker].respond_to?(:max) ? max_size=@queues[worker].max : max_size=0
    {'worker'=>worker,'maximum_size'=>max_size}
  else
    raise InvalidData,"non existent queue"
  end
end

#configure(worker, worker_config) ⇒ Object

Configures the named worker

worker_config is a Hash with possible keys: “maximum_size” - when 0 then it’s unlimited

Will create a queue for the worker if it doesn’t exist

Configuring a worker will reset it’s queue



73
74
75
76
77
78
79
80
# File 'lib/rplex/jobs.rb', line 73

def configure worker,worker_config
  if worker_config["maximum_size"]>0
    @queues[worker]=SizedQueue.new(worker_config["maximum_size"])
  else
    @queues[worker]=Queue.new
  end
  configuration(worker)
end

#remove(worker) ⇒ Object

Removes a queue



91
92
93
# File 'lib/rplex/jobs.rb', line 91

def remove worker
  @queues.delete(worker)
end

#reset(workers) ⇒ Object

Empties the worker queues



62
63
64
# File 'lib/rplex/jobs.rb', line 62

def reset workers
  workers.each{|worker| @queues[worker].clear if @queues[worker]}
end

#valid?(job_data) ⇒ Boolean

Returns true if the job data is valid

Returns:

  • (Boolean)


52
53
54
55
56
# File 'lib/rplex/jobs.rb', line 52

def valid? job_data
  job_data["identifier"] && job_data["data"]
rescue
  false
end

#workersObject

All worker queue names



58
59
60
# File 'lib/rplex/jobs.rb', line 58

def workers
  @queues.keys
end