Class: QRPC::Server::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/qrpc/server/dispatcher.rb

Overview

Queue RPC job.

Instance Method Summary collapse

Constructor Details

#initialize(max_jobs = 0) ⇒ Dispatcher

Constructor.



46
47
48
49
50
51
52
53
54
55
# File 'lib/qrpc/server/dispatcher.rb', line 46

def initialize(max_jobs = 0)
    @count = 0
    @queue = Depq::new
    @mutex = Mutex::new
    @max_jobs = max_jobs
    
    if @max_jobs.nil?
        @max_jobs = 0
    end
end

Instance Method Details

#available?Boolean #available?(&block) ⇒ Object

Indicates free space is available in dispatcher.

If block is given, locks to time space in dispatcher is available so works as synchronization primitive by this way.

Overloads:

  • #available?Boolean

    Returns true if it is, false in otherwise.

    Returns:

    • (Boolean)

      true if it is, false in otherwise

  • #available?(&block) ⇒ Object

    Parameters:

    • block (Proc)

      synchronized block



107
108
109
110
111
112
113
# File 'lib/qrpc/server/dispatcher.rb', line 107

def available?(&block)
    if block.nil?
        return ((@count < @max_jobs) or (@max_jobs == 0))
    else
        @mutex.synchronize(&block)
    end
end

#process_next!Object

Sets up next job for processing.



80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/qrpc/server/dispatcher.rb', line 80

def process_next!
    job = @queue.pop
    job.callback do
        if self.available? and not @queue.empty?
            self.process_next!
        else
            @count -= 1
            self.regulate!
        end
    end

    job.process!
end

#put(job) ⇒ Object

Puts job to dispatcher.

Parameters:



62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/qrpc/server/dispatcher.rb', line 62

def put(job)
    begin
        @queue.put(job, job.priority)
    rescue ::Exception => e
        return
    end
    
    if self.available?
        self.process_next!
        @count += 1
        self.regulate!
    end
end