Class: QRPC::Server::Dispatcher
- Inherits:
-
Object
- Object
- QRPC::Server::Dispatcher
- Defined in:
- lib/qrpc/server/dispatcher.rb
Overview
Queue RPC job.
Instance Method Summary collapse
-
#available?(&block) ⇒ Object
Indicates free space is available in dispatcher.
-
#initialize(max_jobs = 0) ⇒ Dispatcher
constructor
Constructor.
-
#process_next! ⇒ Object
Sets up next job for processing.
-
#put(job) ⇒ Object
Puts job to dispatcher.
Constructor Details
#initialize(max_jobs = 0) ⇒ Dispatcher
Constructor.
46 47 48 49 50 51 52 53 54 55 |
# File 'lib/qrpc/server/dispatcher.rb', line 46 def initialize(max_jobs = 0) @count = 0 @queue = Depq::new @mutex = Mutex::new @max_jobs = max_jobs if @max_jobs.nil? @max_jobs = 0 end end |
Instance Method Details
#available? ⇒ Boolean #available?(&block) ⇒ Object
Indicates free space is available in dispatcher.
If block is given, locks to time space in dispatcher is available so works as synchronization primitive by this way.
107 108 109 110 111 112 113 |
# File 'lib/qrpc/server/dispatcher.rb', line 107 def available?(&block) if block.nil? return ((@count < @max_jobs) or (@max_jobs == 0)) else @mutex.synchronize(&block) end end |
#process_next! ⇒ Object
Sets up next job for processing.
80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/qrpc/server/dispatcher.rb', line 80 def process_next! job = @queue.pop job.callback do if self.available? and not @queue.empty? self.process_next! else @count -= 1 self.regulate! end end job.process! end |
#put(job) ⇒ Object
Puts job to dispatcher.
62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/qrpc/server/dispatcher.rb', line 62 def put(job) begin @queue.put(job, job.priority) rescue ::Exception => e return end if self.available? self.process_next! @count += 1 self.regulate! end end |