Class: QRPC::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/qrpc/client.rb,
lib/qrpc/client/job.rb,
lib/qrpc/client/exception.rb

Overview

Queue RPC client.

Defined Under Namespace

Classes: Exception, Job

Constant Summary collapse

@@clients =

Holds clients for finalizing.

Since:

  • 0.2.0

{ }

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(locator) ⇒ Client

Constructor.

Parameters:

Since:

  • 0.2.0



75
76
77
78
79
80
81
82
83
# File 'lib/qrpc/client.rb', line 75

def initialize(locator)
    @locator = locator
    @pooling = false
    @jobs = { }

    # Destructor
    ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc)
    @@clients[self.object_id] = self
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args, &block) ⇒ Object

Handles call to RPC. (*********)

Be warn, arguments will be serialized to JSON, so they should be serializable nativelly or implement #to_json method.

Parameters:

  • name (Symbol)

    name of the called methods

  • args (Array)

    arguments of the called methods

  • block (Proc)

    callback for returning result

Since:

  • 0.2.0



127
128
129
# File 'lib/qrpc/client.rb', line 127

def method_missing(name, *args, &block)
    self.put(self.create_job(name, args, &block))
end

Class Method Details

.finalize(id) ⇒ Object

Finalizer handler.

Parameters:

  • id (Integer)

    id of finalized instance

Since:

  • 0.2.0



90
91
92
93
94
# File 'lib/qrpc/client.rb', line 90

def self.finalize(id)
    if @@clients.has_key? id
        @@clients[id].finalize!
    end
end

Instance Method Details

#create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block) ⇒ QRPC::Client::Job

Creates job associated to this client session.

Parameters:

  • name (Symbol)

    name of the method of the job

  • args (Array)

    arguments of the method call

  • priority (Integer) (defaults to: QRPC::DEFAULT_PRIORITY)

    job priority

  • block (Proc)

    result returning callback

Returns:

Since:

  • 0.2.0



141
142
143
# File 'lib/qrpc/client.rb', line 141

def create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block)
    Client::Job::new(self.id, name, args, priority, &block)
end

#finalize!Object

Destructor.

Since:

  • 0.2.0



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/qrpc/client.rb', line 100

def finalize!
    if not @input_queue.nil?
        @input_queue.watch("default") do
            @input_queue.ignore(@input_name.to_s) do
                @input_queue.close
            end
        end
    end
    
    if not @output_queue.nil?
        @output_queue.use("default") do
            @output_queue.close
        end
    end
end

#idSymbol

Returns client (or maybe session is better) ID.

Returns:

  • (Symbol)

    client (session) ID

Since:

  • 0.2.0



268
269
270
271
272
273
274
# File 'lib/qrpc/client.rb', line 268

def id
    if @id.nil?
        @id = UUID.generate.to_sym
    end
    
    return @id
end

#input_nameSymbol

Returns input name.

Returns:

  • (Symbol)

    input queue name

Since:

  • 0.2.0



200
201
202
203
204
205
206
# File 'lib/qrpc/client.rb', line 200

def input_name
    if @input_name.nil?
        @input_name = (QRPC::QUEUE_PREFIX.dup << "-" << self.id.to_s << "-" << QRPC::QUEUE_POSTFIX_OUTPUT).to_sym
    end
    
    return @input_name
end

#input_queue(&block) ⇒ Object

Returns input queue.

Parameters:

  • block (Proc)

    block to which will be input queue given

Since:

  • 0.2.0



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/qrpc/client.rb', line 213

def input_queue(&block)
    if @input_queue.nil?
        @input_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port)
        @input_queue.watch(self.input_name.to_s) do
            @input_queue.ignore("default") do

                # Results pooler error handler
                @input_queue.on_error do |error|
                    raise Exception::new("Beanstalk error: " << error.to_s)
                end
                
                # Returns
                block.call(@input_queue)
                
            end
        end
    else
        block.call(@input_queue)
    end
end

#output_nameSymbol

Returns output name.

Returns:

  • (Symbol)

    output queue name

Since:

  • 0.2.0



239
240
241
242
243
244
245
# File 'lib/qrpc/client.rb', line 239

def output_name
    if @output_name.nil?
        @output_name = (QRPC::QUEUE_PREFIX.dup << "-" << @locator.queue << "-" << QRPC::QUEUE_POSTFIX_INPUT).to_sym
    end
    
    return @output_name
end

#output_queue(&block) ⇒ Object

Returns output queue.

Parameters:

  • block (Proc)

    block to which will be output queue given

Since:

  • 0.2.0



252
253
254
255
256
257
258
259
260
261
# File 'lib/qrpc/client.rb', line 252

def output_queue(&block)
    if @output_queue.nil?
        @output_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port)
        @output_queue.use(self.output_name.to_s) do
            block.call(@output_queue)
        end
    else
        block.call(@output_queue)
    end
end

#pool!Object

Starts input (results) pooling.

Since:

  • 0.2.0



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/qrpc/client.rb', line 167

def pool!
    
    # Results processing logic
    processor = Proc::new do |job|
        response = JsonRpcObjects::Response::parse(job.body)
        id = response.id.to_sym
        job.delete()
        
        if @jobs.include? id
            @jobs[id].assign_result(response)
        end
        
        @jobs.delete(id)
    end
    
    # Runs processor for each job            
    parent = self
    worker = EM.spawn do
        parent.input_queue { |q| q.each_job(&processor) }
    end
    
    ##
    
    worker.run
    @pooling = true
    
end

#put(job) ⇒ Object

Puts job to client.

Parameters:

Since:

  • 0.2.0



149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/qrpc/client.rb', line 149

def put(job)
    if not job.notification?
        @jobs[job.id] = job
    end
    
    self.output_queue do |queue|
        queue.put(job.to_json)
    end
    
    if (not @pooling) and (@jobs.length > 0)
        self.pool!
    end
end