Class: QRPC::Client::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/qrpc/client/dispatcher.rb

Overview

Queue RPC client dispatcher (worker).

Since:

  • 0.3.0

Constant Summary collapse

@@clients =

Holds clients for finalizing.

Since:

  • 0.3.0

{ }

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(locator) ⇒ Dispatcher

Constructor.

Parameters:

Since:

  • 0.3.0



86
87
88
89
90
91
92
93
94
# File 'lib/qrpc/client/dispatcher.rb', line 86

def initialize(locator)
    @locator = locator
    @pooling = false
    @jobs = { }

    # Destructor
    ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc)
    @@clients[self.object_id] = self
end

Class Method Details

.finalize(id) ⇒ Object

Finalizer handler.

Parameters:

  • id (Integer)

    id of finalized instance

Since:

  • 0.3.0



101
102
103
104
105
# File 'lib/qrpc/client/dispatcher.rb', line 101

def self.finalize(id)
    if @@clients.has_key? id
        @@clients[id].finalize!
    end
end

Instance Method Details

#create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block) ⇒ QRPC::Client::Job

Creates job associated to this client session.

Parameters:

  • name (Symbol)

    name of the method of the job

  • args (Array)

    arguments of the method call

  • priority (Integer) (defaults to: QRPC::DEFAULT_PRIORITY)

    job priority

  • block (Proc)

    result returning callback

Returns:

Since:

  • 0.3.0



137
138
139
# File 'lib/qrpc/client/dispatcher.rb', line 137

def create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block)
    Client::Job::new(self.id, name, args, priority, &block)
end

#finalize!Object

Destructor.

Since:

  • 0.3.0



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/qrpc/client/dispatcher.rb', line 111

def finalize!
    if not @input_queue.nil?
        @input_queue.watch("default") do
            @input_queue.ignore(@input_name.to_s) do
                @input_queue.close
            end
        end
    end
    
    if not @output_queue.nil?
        @output_queue.use("default") do
            @output_queue.close
        end
    end
end

#idSymbol

Returns client (or maybe session is better) ID.

Returns:

  • (Symbol)

    client (session) ID

Since:

  • 0.3.0



264
265
266
267
268
269
270
# File 'lib/qrpc/client/dispatcher.rb', line 264

def id
    if @id.nil?
        @id = UUID.generate(:compact).to_sym
    end
    
    return @id
end

#input_nameSymbol

Returns input name.

Returns:

  • (Symbol)

    input queue name

Since:

  • 0.3.0



196
197
198
199
200
201
202
# File 'lib/qrpc/client/dispatcher.rb', line 196

def input_name
    if @input_name.nil?
        @input_name = (QRPC::QUEUE_PREFIX.dup << "-" << self.id.to_s << "-" << QRPC::QUEUE_POSTFIX_OUTPUT).to_sym
    end
    
    return @input_name
end

#input_queue(&block) ⇒ Object

Returns input queue.

Parameters:

  • block (Proc)

    block to which will be input queue given

Since:

  • 0.3.0



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/qrpc/client/dispatcher.rb', line 209

def input_queue(&block)
    if @input_queue.nil?
        @input_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port)
        @input_queue.watch(self.input_name.to_s) do
            @input_queue.ignore("default") do

                # Results pooler error handler
                @input_queue.on_error do |error|
                    raise ::Exception::new("Beanstalk error: " << error.to_s)
                end
                
                # Returns
                block.call(@input_queue)
                
            end
        end
    else
        block.call(@input_queue)
    end
end

#output_nameSymbol

Returns output name.

Returns:

  • (Symbol)

    output queue name

Since:

  • 0.3.0



235
236
237
238
239
240
241
# File 'lib/qrpc/client/dispatcher.rb', line 235

def output_name
    if @output_name.nil?
        @output_name = (QRPC::QUEUE_PREFIX.dup << "-" << @locator.queue << "-" << QRPC::QUEUE_POSTFIX_INPUT).to_sym
    end
    
    return @output_name
end

#output_queue(&block) ⇒ Object

Returns output queue.

Parameters:

  • block (Proc)

    block to which will be output queue given

Since:

  • 0.3.0



248
249
250
251
252
253
254
255
256
257
# File 'lib/qrpc/client/dispatcher.rb', line 248

def output_queue(&block)
    if @output_queue.nil?
        @output_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port)
        @output_queue.use(self.output_name.to_s) do
            block.call(@output_queue)
        end
    else
        block.call(@output_queue)
    end
end

#pool!Object

Starts input (results) pooling.

Since:

  • 0.3.0



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/qrpc/client/dispatcher.rb', line 163

def pool!
    
    # Results processing logic
    processor = Proc::new do |job|
        response = JsonRpcObjects::Response::parse(job.body)
        id = response.id.to_sym
        job.delete()
        
        if @jobs.include? id
            @jobs[id].assign_result(response)
        end
        
        @jobs.delete(id)
    end
    
    # Runs processor for each job            
    parent = self
    worker = EM.spawn do
        parent.input_queue { |q| q.each_job(&processor) }
    end
    
    ##
    
    worker.run
    @pooling = true
    
end

#put(job) ⇒ Object

Puts job to client.

Parameters:

Since:

  • 0.3.0



145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/qrpc/client/dispatcher.rb', line 145

def put(job)
    if not job.notification?
        @jobs[job.id] = job
    end
    
    self.output_queue do |queue|
        queue.put(job.to_json)
    end
    
    if (not @pooling) and (@jobs.length > 0)
        self.pool!
    end
end