Class: JobQueue
- Inherits:
-
Object
- Object
- JobQueue
- Defined in:
- lib/jobqueue.rb
Overview
Direct Known Subclasses
Instance Attribute Summary collapse
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Class Method Summary collapse
-
.maxnumber_of_processors ⇒ Object
Get the maximum number of parallel runs.
Instance Method Summary collapse
-
#initialize(nWorkers = JobQueue.maxnumber_of_processors, debug = :off) ⇒ JobQueue
constructor
Create a new queue qith a given number of worker threads.
-
#push(*item, &block) ⇒ Object
Put jobs into the queue.
-
#run ⇒ Object
Start workers to run through the queue.
Constructor Details
#initialize(nWorkers = JobQueue.maxnumber_of_processors, debug = :off) ⇒ JobQueue
Create a new queue qith a given number of worker threads
14 15 16 17 18 |
# File 'lib/jobqueue.rb', line 14 def initialize(nWorkers=JobQueue.maxnumber_of_processors,debug=:off) @workers = nWorkers @queue = Queue.new @debug = debug end |
Instance Attribute Details
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
11 12 13 |
# File 'lib/jobqueue.rb', line 11 def threads @threads end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
11 12 13 |
# File 'lib/jobqueue.rb', line 11 def workers @workers end |
Class Method Details
.maxnumber_of_processors ⇒ Object
Get the maximum number of parallel runs
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/jobqueue.rb', line 58 def JobQueue.maxnumber_of_processors case RUBY_ENGINE when 'jruby' require 'java' return java.lang.Runtime.getRuntime.availableProcessors when 'ironruby' return System::Environment.ProcessorCount when 'ruby','rbx' case RUBY_PLATFORM when /linux/ return `cat /proc/cpuinfo | grep processor | wc -l`.to_i when /darwin/ return `sysctl -n hw.logicalcpu`.to_i when /(win32|mingw|cygwin)/ # this works for windows 2000 or greater require 'win32ole' wmi = WIN32OLE.connect("winmgmts://") wmi.ExecQuery("select * from Win32_ComputerSystem").each do |system| begin processors = system.NumberOfLogicalProcessors rescue processors = 0 end return [system.NumberOfProcessors, processors].max end when /powerpc-aix/ return IO.popen("lsdev -Cc processor").readlines.size # alternative, but slover: IO.popen("prtconf").readlines.grep(/Number of processors/i).first.split(" ").last.to_i end end raise "Cannot determine the number of available Processors for RUBY_PLATFORM:'#{RUBY_PLATFORM}' and RUBY_ENGINE:#{RUBY_ENGINE}" end |
Instance Method Details
#push(*item, &block) ⇒ Object
Put jobs into the queue. Use
proc,args for single methods
object,:method,args for sende messages to objects
28 29 30 31 |
# File 'lib/jobqueue.rb', line 28 def push(*item,&block) @queue << item unless item.empty? @queue << [block] unless block.nil? end |
#run ⇒ Object
Start workers to run through the queue
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/jobqueue.rb', line 34 def run @threads = (1..@workers).map {|i| Thread.new(@queue) {|q| until ( q == ( task = q.deq ) ) if task.size > 1 if task[0].kind_of? Proc # Expects proc/lambda with arguments, e.g. [mysqrt,2.789] task[0].call(*task[1..-1]) else # expect an object in task[0] and one of its methods with arguments in task[1] as a symbol # e.g. [a,[:attribute=,1] task[0].send(task[1],*task[2..-1]) end else task[0].call end end } } @threads.size.times { @queue.enq @queue} @threads.each {|t| t.join} end |