Class: QRPC::Server
- Inherits:
-
Object
- Object
- QRPC::Server
- Defined in:
- lib/qrpc/server.rb,
lib/qrpc/server/job.rb,
lib/qrpc/server/dispatcher.rb
Overview
Queue RPC server.
Defined Under Namespace
Classes: Dispatcher, Job
Constant Summary collapse
- QRPC_PREFIX =
Deprecated.
(since 0.2.0)
Prefix for handled queues.
QRPC::QUEUE_PREFIX
- QRPC_POSTFIX_INPUT =
Deprecated.
(since 0.2.0)
Input queue postfix.
QRPC::QUEUE_POSTFIX_INPUT
- QRPC_POSTFIX_OUTPUT =
Deprecated.
(since 0.2.0)
Output queue postfix.
QRPC::QUEUE_POSTFIX_OUTPUT
- @@servers =
Holds servers for finalizing.
{ }
Class Method Summary collapse
-
.finalize(id) ⇒ Object
Finalizer handler.
Instance Method Summary collapse
-
#finalize! ⇒ Object
Destructor.
-
#initialize(api) ⇒ Server
constructor
Constructor.
-
#input_name ⇒ Symbol
Returns input name.
-
#input_queue(&block) ⇒ Object
Returns input queue.
-
#listen!(locator, opts = { }) ⇒ Object
Listens to the queue.
-
#output_name(client) ⇒ Symbol
Returns output name for client name.
-
#output_queue ⇒ EMJack::Connection
Returns output queue.
-
#start_listening(locator, opts) ⇒ Object
Starts listening to the queue.
Constructor Details
#initialize(api) ⇒ Server
Constructor.
109 110 111 112 113 114 115 116 |
# File 'lib/qrpc/server.rb', line 109 def initialize(api) @api = api @output_name_cache = { } # Destructor ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc) @@servers[self.object_id] = self end |
Class Method Details
.finalize(id) ⇒ Object
Finalizer handler.
123 124 125 126 127 |
# File 'lib/qrpc/server.rb', line 123 def self.finalize(id) if @@servers.has_key? id @@servers[id].finalize! end end |
Instance Method Details
#finalize! ⇒ Object
Destructor.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/qrpc/server.rb', line 133 def finalize! if not @input_queue.nil? @input_queue.watch("default") do @input_queue.ignore(@input_name.to_s) do @input_queue.close end end end if not @output_queue.nil? @output_queue.use("default") do @output_queue.close end end end |
#input_name ⇒ Symbol
Returns input name.
248 249 250 251 252 253 254 |
# File 'lib/qrpc/server.rb', line 248 def input_name if @input_name.nil? @input_name = (QRPC::QUEUE_PREFIX.dup << "-" << @locator.queue << "-" << QRPC::QUEUE_POSTFIX_INPUT).to_sym end return @input_name end |
#input_queue(&block) ⇒ Object
Returns input queue.
194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/qrpc/server.rb', line 194 def input_queue(&block) if not @input_queue @input_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port) @input_queue.watch(self.input_name.to_s) do @input_queue.ignore("default") do block.call(@input_queue) end end else block.call(@input_queue) end end |
#listen!(locator, opts = { }) ⇒ Object
Listens to the queue. (Blocking call which starts eventmachine.)
158 159 160 161 162 |
# File 'lib/qrpc/server.rb', line 158 def listen!(locator, opts = { }) EM.run do self.start_listening(locator, opts) end end |
#output_name(client) ⇒ Symbol
Returns output name for client name.
227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/qrpc/server.rb', line 227 def output_name(client) client_index = client.to_sym if not @output_name_cache.include? client_index output_name = QRPC::QUEUE_PREFIX.dup << "-" << client.to_s << "-" << QRPC::QUEUE_POSTFIX_OUTPUT output_name = output_name.to_sym @output_name_cache[client_index] = output_name else output_name = @output_name_cache[client_index] end return output_name end |
#output_queue ⇒ EMJack::Connection
Returns output queue.
212 213 214 215 216 217 218 |
# File 'lib/qrpc/server.rb', line 212 def output_queue if @output_queue.nil? @output_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port) end return @output_queue end |
#start_listening(locator, opts) ⇒ Object
Starts listening to the queue. (Blocking queue which expect, eventmachine is started.)
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/qrpc/server.rb', line 172 def start_listening(locator, opts) @locator = locator @dispatcher = QRPC::Server::Dispatcher::new(opts[:max_jobs]) # Cache cleaning dispatcher EM.add_periodic_timer(20) do @output_name_cache.clear end # Process input queue self.input_queue do |queue| queue.each_job do |job| self.process_job(job) end end end |