Class: QRPC::Client
- Inherits:
-
Object
- Object
- QRPC::Client
- Defined in:
- lib/qrpc/client.rb,
lib/qrpc/client/job.rb,
lib/qrpc/client/exception.rb
Overview
Queue RPC client.
Defined Under Namespace
Constant Summary collapse
- @@clients =
Holds clients for finalizing.
{ }
Class Method Summary collapse
-
.finalize(id) ⇒ Object
Finalizer handler.
Instance Method Summary collapse
-
#create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block) ⇒ QRPC::Client::Job
Creates job associated to this client session.
-
#finalize! ⇒ Object
Destructor.
-
#id ⇒ Symbol
Returns client (or maybe session is better) ID.
-
#initialize(locator) ⇒ Client
constructor
Constructor.
-
#input_name ⇒ Symbol
Returns input name.
-
#input_queue(&block) ⇒ Object
Returns input queue.
-
#method_missing(name, *args, &block) ⇒ Object
Handles call to RPC.
-
#output_name ⇒ Symbol
Returns output name.
-
#output_queue(&block) ⇒ Object
Returns output queue.
-
#pool! ⇒ Object
Starts input (results) pooling.
-
#put(job) ⇒ Object
Puts job to client.
Constructor Details
#initialize(locator) ⇒ Client
Constructor.
75 76 77 78 79 80 81 82 83 |
# File 'lib/qrpc/client.rb', line 75 def initialize(locator) @locator = locator @pooling = false @jobs = { } # Destructor ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc) @@clients[self.object_id] = self end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(name, *args, &block) ⇒ Object
Handles call to RPC. (*********)
Be warn, arguments will be serialized to JSON, so they should be serializable nativelly or implement #to_json method.
127 128 129 |
# File 'lib/qrpc/client.rb', line 127 def method_missing(name, *args, &block) self.put(self.create_job(name, args, &block)) end |
Class Method Details
.finalize(id) ⇒ Object
Finalizer handler.
90 91 92 93 94 |
# File 'lib/qrpc/client.rb', line 90 def self.finalize(id) if @@clients.has_key? id @@clients[id].finalize! end end |
Instance Method Details
#create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block) ⇒ QRPC::Client::Job
Creates job associated to this client session.
141 142 143 |
# File 'lib/qrpc/client.rb', line 141 def create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block) Client::Job::new(self.id, name, args, priority, &block) end |
#finalize! ⇒ Object
Destructor.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/qrpc/client.rb', line 100 def finalize! if not @input_queue.nil? @input_queue.watch("default") do @input_queue.ignore(@input_name.to_s) do @input_queue.close end end end if not @output_queue.nil? @output_queue.use("default") do @output_queue.close end end end |
#id ⇒ Symbol
Returns client (or maybe session is better) ID.
268 269 270 271 272 273 274 |
# File 'lib/qrpc/client.rb', line 268 def id if @id.nil? @id = UUID.generate.to_sym end return @id end |
#input_name ⇒ Symbol
Returns input name.
200 201 202 203 204 205 206 |
# File 'lib/qrpc/client.rb', line 200 def input_name if @input_name.nil? @input_name = (QRPC::QUEUE_PREFIX.dup << "-" << self.id.to_s << "-" << QRPC::QUEUE_POSTFIX_OUTPUT).to_sym end return @input_name end |
#input_queue(&block) ⇒ Object
Returns input queue.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/qrpc/client.rb', line 213 def input_queue(&block) if @input_queue.nil? @input_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port) @input_queue.watch(self.input_name.to_s) do @input_queue.ignore("default") do # Results pooler error handler @input_queue.on_error do |error| raise Exception::new("Beanstalk error: " << error.to_s) end # Returns block.call(@input_queue) end end else block.call(@input_queue) end end |
#output_name ⇒ Symbol
Returns output name.
239 240 241 242 243 244 245 |
# File 'lib/qrpc/client.rb', line 239 def output_name if @output_name.nil? @output_name = (QRPC::QUEUE_PREFIX.dup << "-" << @locator.queue << "-" << QRPC::QUEUE_POSTFIX_INPUT).to_sym end return @output_name end |
#output_queue(&block) ⇒ Object
Returns output queue.
252 253 254 255 256 257 258 259 260 261 |
# File 'lib/qrpc/client.rb', line 252 def output_queue(&block) if @output_queue.nil? @output_queue = EMJack::Connection::new(:host => @locator.host, :port => @locator.port) @output_queue.use(self.output_name.to_s) do block.call(@output_queue) end else block.call(@output_queue) end end |
#pool! ⇒ Object
Starts input (results) pooling.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/qrpc/client.rb', line 167 def pool! # Results processing logic processor = Proc::new do |job| response = JsonRpcObjects::Response::parse(job.body) id = response.id.to_sym job.delete() if @jobs.include? id @jobs[id].assign_result(response) end @jobs.delete(id) end # Runs processor for each job parent = self worker = EM.spawn do parent.input_queue { |q| q.each_job(&processor) } end ## worker.run @pooling = true end |
#put(job) ⇒ Object
Puts job to client.
149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/qrpc/client.rb', line 149 def put(job) if not job.notification? @jobs[job.id] = job end self.output_queue do |queue| queue.put(job.to_json) end if (not @pooling) and (@jobs.length > 0) self.pool! end end |