Class: QRPC::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/qrpc/server.rb,
lib/qrpc/server/job.rb,
lib/qrpc/server/dispatcher.rb

Overview

Queue RPC server.

Defined Under Namespace

Classes: Dispatcher, Job

Constant Summary collapse

QRPC_PREFIX =
Deprecated.

(since 0.2.0)

Prefix for handled queues.

QRPC::QUEUE_PREFIX
QRPC_POSTFIX_INPUT =
Deprecated.

(since 0.2.0)

Input queue postfix.

QRPC::QUEUE_POSTFIX_INPUT
QRPC_POSTFIX_OUTPUT =
Deprecated.

(since 0.2.0)

Output queue postfix.

QRPC::QUEUE_POSTFIX_OUTPUT
@@servers =

Holds servers for finalizing.

{ }

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api) ⇒ Server

Constructor.

Parameters:

  • api (Object)

    some object which will be used as RPC API



109
110
111
112
113
114
115
116
# File 'lib/qrpc/server.rb', line 109

def initialize(api)
    @api = api
    @output_name_cache = { }
    
    # Destructor
    ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc)
    @@servers[self.object_id] = self
end

Class Method Details

.finalize(id) ⇒ Object

Finalizer handler.

Parameters:

  • id (Integer)

    id of finalized instance



123
124
125
126
127
# File 'lib/qrpc/server.rb', line 123

def self.finalize(id)
    if @@servers.has_key? id
        @@servers[id].finalize!
    end
end

Instance Method Details

#finalize!Object

Destructor.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/qrpc/server.rb', line 133

def finalize!
    if not @input_queue.nil?
        @input_queue.watch("default") do
            @input_queue.ignore(@input_name.to_s) do
                @input_queue.close
            end
        end
    end
    
    if not @output_queue.nil?
        @output_queue.use("default") do
            @output_queue.close
        end
    end
end

#input_nameSymbol

Returns input name.

Returns:

  • (Symbol)

    input name

Since:

  • 0.1.1



248
249
250
251
252
253
254
# File 'lib/qrpc/server.rb', line 248

def input_name
    if @input_name.nil?
        @input_name = (QRPC::QUEUE_PREFIX.dup << "-" << @locator.queue << "-" << QRPC::QUEUE_POSTFIX_INPUT).to_sym
    end
    
    return @input_name
end

#input_queue(&block) ⇒ Object

Returns input queue.

Parameters:

  • block (Proc)

    block to which will be input queue given



194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/qrpc/server.rb', line 194

def input_queue(&block)
    if not @input_queue
        @input_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port)
        @input_queue.watch(self.input_name.to_s) do
            @input_queue.ignore("default") do
                block.call(@input_queue)
            end
        end
    else
        block.call(@input_queue)
    end
end

#listen!(locator, opts = { }) ⇒ Object

Listens to the queue. (Blocking call which starts eventmachine.)

Parameters:

  • locator (QRPC::Locator)

    of the input queue

  • opts (Hash) (defaults to: { })

    options for the server



158
159
160
161
162
# File 'lib/qrpc/server.rb', line 158

def listen!(locator, opts = { })
    EM.run do
        self.start_listening(locator, opts)
    end
end

#output_name(client) ⇒ Symbol

Returns output name for client name.

Parameters:

  • client (String, Symbol)

    client identifier

Returns:

  • (Symbol)

    output name



227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/qrpc/server.rb', line 227

def output_name(client)
    client_index = client.to_sym
    
    if not @output_name_cache.include? client_index
       output_name = QRPC::QUEUE_PREFIX.dup << "-" << client.to_s << "-" << QRPC::QUEUE_POSTFIX_OUTPUT
       output_name = output_name.to_sym
       @output_name_cache[client_index] = output_name
    else
        output_name = @output_name_cache[client_index]
    end
       
    return output_name
end

#output_queueEMJack::Connection

Returns output queue.

Returns:

  • (EMJack::Connection)

    output queue Beanstalk connection



212
213
214
215
216
217
218
# File 'lib/qrpc/server.rb', line 212

def output_queue
    if @output_queue.nil?
        @output_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port)
    end
    
    return @output_queue
end

#start_listening(locator, opts) ⇒ Object

Starts listening to the queue. (Blocking queue which expect, eventmachine is started.)

Parameters:

  • locator (QRPC::Locator)

    of the input queue

  • opts (Hash)

    options for the server



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/qrpc/server.rb', line 172

def start_listening(locator, opts)
    @locator = locator
    @dispatcher = QRPC::Server::Dispatcher::new(opts[:max_jobs])
    
    # Cache cleaning dispatcher
    EM.add_periodic_timer(20) do
        @output_name_cache.clear
    end
    
    # Process input queue
    self.input_queue do |queue|
        queue.each_job do |job|
            self.process_job(job)
        end
    end
end