Class: Slave
- Inherits:
-
Object
- Object
- Slave
- Defined in:
- lib/slave.rb,
lib/slave-1.2.1.rb
Overview
the Slave class encapsulates the work of setting up a drb server in another process running on localhost via unix domain sockets. the slave process is attached to it’s parent via a LifeLine which is designed such that the slave cannot out-live it’s parent and become a zombie, even if the parent dies and early death, such as by ‘kill -9’. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg
class Server
def add_two n
n + 2
end
end
slave = Slave.new 'object' => Server.new
server = slave.object
p server.add_two(40) #=> 42
two other methods of providing server objects exist:
a) server = Server.new “this is called the parent” }
Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}
b) Slave.new{ Server.new “this is called only in the child” }
of the two ‘b’ is preferred.
Defined Under Namespace
Classes: LifeLine, ThreadSafe, ThreadSafeHash
Constant Summary collapse
- VERSION =
–{{{
'1.2.1'
- DEFAULT_SOCKET_CREATION_ATTEMPTS =
env config
Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42)
- DEFAULT_DEBUG =
(ENV['SLAVE_DEBUG'] ? true : false)
- DEFAULT_THREADSAFE =
(ENV['SLAVE_THREADSAFE'] ? true : false)
Class Attribute Summary collapse
-
.debug ⇒ Object
if this is true and you are running from a terminal information is printed on STDERR.
-
.socket_creation_attempts ⇒ Object
defineds how many attempts will be made to create a temporary unix domain socket.
-
.threadsafe ⇒ Object
if this is true all slave objects will be wrapped such that any call to the object is threadsafe.
Instance Attribute Summary collapse
-
#at_exit ⇒ Object
readonly
Returns the value of attribute at_exit.
-
#debug ⇒ Object
readonly
Returns the value of attribute debug.
-
#dumped ⇒ Object
readonly
Returns the value of attribute dumped.
-
#obj ⇒ Object
readonly
attrs.
-
#object ⇒ Object
readonly
Returns the value of attribute object.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#ppid ⇒ Object
readonly
Returns the value of attribute ppid.
-
#psname ⇒ Object
readonly
Returns the value of attribute psname.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#socket_creation_attempts ⇒ Object
readonly
Returns the value of attribute socket_creation_attempts.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Class Method Summary collapse
-
.default(key) ⇒ Object
get a default value.
- .description ⇒ Object
-
.fork(&b) ⇒ Object
just fork with out silly warnings.
- .getopts(opts) ⇒ Object
-
.object(opts = {}, &b) ⇒ Object
a simple convenience method which returns an object from another process.
- .version ⇒ Object
Instance Method Summary collapse
-
#default(key) ⇒ Object
see docs for Slave.default.
-
#detach ⇒ Object
starts a thread to collect the child status and sets up at_exit handler to prevent zombies.
-
#gen_psname(obj) ⇒ Object
generate a default name to appear in ps/top.
-
#getopts(opts) ⇒ Object
see docs for Slave.getopts.
-
#initialize(opts = {}, &block) ⇒ Slave
constructor
sets up a child process serving any object as a DRb server running locally on unix domain sockets.
-
#shutdown(opts = {}) ⇒ Object
cuts the lifeline and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown.
-
#shutdown? ⇒ Boolean
true.
-
#trace ⇒ Object
debugging output - ENV=1 to enable.
-
#wait(opts = {}, &b) ⇒ Object
(also: #wait2)
wait for slave to finish.
Constructor Details
#initialize(opts = {}, &block) ⇒ Slave
sets up a child process serving any object as a DRb server running locally on unix domain sockets. the child process has a LifeLine established between it and the parent, making it impossible for the child to outlive the parent (become a zombie). the object to serve is specfied either directly using the ‘object’/:object keyword
Slave.new :object => MyServer.new
or, preferably, using the block form
Slave.new{ MyServer.new }
when the block form is used the object is contructed in the child process itself. this is quite advantageous if the child object consumes resources or opens file handles (db connections, etc). by contructing the object in the child any resources are consumed from the child’s address space and things like open file handles will not be carried into subsequent child processes (via standard unix fork semantics). in the event that a block is specified but the object cannot be constructed and, instead, throws and Exception, that exception will be propogated to the parent process.
opts may contain the following keys, as either strings or symbols
object : specify the slave object. otherwise block value is used.
socket_creation_attempts : specify how many attempts to create a unix domain socket will be made
debug : turn on some logging to STDERR
psname : specify the name that will appear in 'top' ($0)
at_exit : specify a lambda to be called in the *parent* when the child dies
dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped)
threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/slave.rb', line 321 def initialize opts = {}, &block getopt = getopts opts @obj = getopt['object'] @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts') @debug = getopt['debug'] || default('debug') @psname = getopt['psname'] @at_exit = getopt['at_exit'] @dumped = getopt['dumped'] @threadsafe = getopt['threadsafe'] || default('threadsafe') raise ArgumentError, 'no slave object or slave object block provided!' if @obj.nil? and block.nil? @shutdown = false @waiter = @status = nil @lifeline = LifeLine.new # weird syntax because dot/rdoc chokes on this!?!? init_failure = lambda do |e| trace{ %Q[#{ e. } (#{ e.class })\n#{ e.backtrace.join "\n" }] } o = Object.new class << o attr_accessor '__slave_object_failure__' end o.__slave_object_failure__ = Marshal.dump [e.class, e., e.backtrace] @object = o end # child # unless((@pid = Slave::fork)) e = nil begin Kernel.at_exit{ Kernel.exit! } @lifeline.catch if @obj @object = @obj else begin @object = block.call rescue Exception => e init_failure[e] end end if block and @obj begin block[@obj] rescue Exception => e init_failure[e] end end $0 = (@psname ||= gen_psname(@object)) unless @dumped or @object.respond_to?('__slave_object_failure__') @object.extend DRbUndumped end if @threadsafe @object = ThreadSafe.new @object end @ppid, @pid = Process.ppid, Process.pid @socket = nil @uri = nil tmpdir = test(?d, '/tmp') ? '/tmp' : Dir.tmpdir basename = File.basename(@psname) @socket_creation_attempts.times do |attempt| se = nil begin s = if attempt > 0 File.join(tmpdir, "#{ basename }_#{ attempt }") else File.join(tmpdir, "#{ basename }") end raise("#{ s } is too long!") if s.size > 103 u = "drbunix://#{ s }" DRb::start_service u, @object @socket = s @uri = u trace{ "child - socket <#{ @socket }>" } trace{ "child - uri <#{ @uri }>" } break rescue Errno::EADDRINUSE => se nil end end if @socket and @uri trap('SIGUSR2') do DBb::thread.kill rescue nil FileUtils::rm_f @socket rescue nil exit end @lifeline.puts(@socket) @lifeline.cling else @lifeline.release warn "slave(#{ $$ }) could not create socket!" exit end rescue Exception => e trace{ %Q[#{ e. } (#{ e.class })\n#{ e.backtrace.join "\n" }] } ensure status = e.respond_to?('status') ? e.status : 1 exit(status) end # parent # else detach @lifeline.throw buf = @lifeline.gets raise "failed to find slave socket" if buf.nil? or buf.strip.empty? @socket = buf.strip trace{ "parent - socket <#{ @socket }>" } if @at_exit @at_exit_thread = @lifeline.on_cut{ @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self) } end if @socket and File::exist? @socket Kernel.at_exit{ FileUtils::rm_f @socket } @uri = "drbunix://#{ socket }" trace{ "parent - uri <#{ @uri }>" } # starting drb on localhost avoids dns lookups! # DRb::start_service('druby://0.0.0.0:0', nil) unless DRb::thread @object = DRbObject::new nil, @uri if @object.respond_to? '__slave_object_failure__' c, m, bt = Marshal.load @object.__slave_object_failure__ (e = c.new(m)).set_backtrace bt trace{ %Q[#{ e. } (#{ e.class })\n#{ e.backtrace.join "\n" }] } raise e end @psname ||= gen_psname(@object) else raise "failed to find slave socket <#{ @socket }>" end end end |
Class Attribute Details
.debug ⇒ Object
if this is true and you are running from a terminal information is printed on STDERR
72 73 74 |
# File 'lib/slave.rb', line 72 def debug @debug end |
.socket_creation_attempts ⇒ Object
defineds how many attempts will be made to create a temporary unix domain socket
67 68 69 |
# File 'lib/slave.rb', line 67 def socket_creation_attempts @socket_creation_attempts end |
.threadsafe ⇒ Object
if this is true all slave objects will be wrapped such that any call to the object is threadsafe. if you do not use this you must ensure that your objects are threadsafe __yourself__ as this is required of any object acting as a drb server
79 80 81 |
# File 'lib/slave.rb', line 79 def threadsafe @threadsafe end |
Instance Attribute Details
#at_exit ⇒ Object (readonly)
Returns the value of attribute at_exit.
280 281 282 |
# File 'lib/slave.rb', line 280 def at_exit @at_exit end |
#debug ⇒ Object (readonly)
Returns the value of attribute debug.
278 279 280 |
# File 'lib/slave.rb', line 278 def debug @debug end |
#dumped ⇒ Object (readonly)
Returns the value of attribute dumped.
281 282 283 |
# File 'lib/slave.rb', line 281 def dumped @dumped end |
#obj ⇒ Object (readonly)
attrs
276 277 278 |
# File 'lib/slave.rb', line 276 def obj @obj end |
#object ⇒ Object (readonly)
Returns the value of attribute object.
284 285 286 |
# File 'lib/slave.rb', line 284 def object @object end |
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
285 286 287 |
# File 'lib/slave.rb', line 285 def pid @pid end |
#ppid ⇒ Object (readonly)
Returns the value of attribute ppid.
286 287 288 |
# File 'lib/slave.rb', line 286 def ppid @ppid end |
#psname ⇒ Object (readonly)
Returns the value of attribute psname.
279 280 281 |
# File 'lib/slave.rb', line 279 def psname @psname end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
288 289 290 |
# File 'lib/slave.rb', line 288 def socket @socket end |
#socket_creation_attempts ⇒ Object (readonly)
Returns the value of attribute socket_creation_attempts.
277 278 279 |
# File 'lib/slave.rb', line 277 def socket_creation_attempts @socket_creation_attempts end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
283 284 285 |
# File 'lib/slave.rb', line 283 def status @status end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
287 288 289 |
# File 'lib/slave.rb', line 287 def uri @uri end |
Class Method Details
.default(key) ⇒ Object
get a default value
83 84 85 |
# File 'lib/slave.rb', line 83 def default(key) send key end |
.description ⇒ Object
45 46 47 |
# File 'lib/slave.rb', line 45 def Slave.description 'easily start a drb server in another process' end |
.fork(&b) ⇒ Object
just fork with out silly warnings
101 102 103 104 105 106 107 108 109 |
# File 'lib/slave.rb', line 101 def fork(&b) v = $VERBOSE begin $VERBOSE = nil Process::fork(&b) ensure $VERBOSE = v end end |
.getopts(opts) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/slave.rb', line 87 def getopts(opts) raise ArgumentError, opts.class unless opts.respond_to?('has_key?') and opts.respond_to?('[]') lambda do |key, *defval| defval = defval.shift keys = [key, key.to_s, key.to_s.intern] key = keys.detect{|k| opts.has_key? k } and break opts[key] defval end end |
.object(opts = {}, &b) ⇒ Object
a simple convenience method which returns an object from another process. the object returned is the result of the supplied block. eg
object = Slave.object{ processor_intensive_object_built_in_child_process() }
eg.
the call can be made asynchronous via the ‘async’/:async keyword
thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() }
# go on about your coding business then, later
object = thread.value
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 |
# File 'lib/slave.rb', line 578 def self.object opts = {}, &b async = opts.delete('async') || opts.delete(:async) opts['object'] = opts[:object] = lambda(&b) opts['dumped'] = opts[:dumped] = true slave = Slave.new opts value = lambda do |slave| begin slave.object.call ensure slave.shutdown end end async ? Thread.new{ value[slave] } : value[slave] end |
.version ⇒ Object
43 |
# File 'lib/slave.rb', line 43 def self.version() VERSION end |
Instance Method Details
#default(key) ⇒ Object
see docs for Slave.default
543 544 545 |
# File 'lib/slave.rb', line 543 def default key self.class.default key end |
#detach ⇒ Object
starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 |
# File 'lib/slave.rb', line 480 def detach reap = lambda do |cid| begin @status = Process::waitpid2(cid).last rescue Exception => e m, c, b = e., e.class, e.backtrace.join("\n") warn "#{ m } (#{ c })\n#{ b }" unless e.is_a? Errno::ECHILD end end Kernel.at_exit do shutdown rescue nil reap[@pid] rescue nil end @waiter = Thread.new do begin @status = Process::waitpid2(@pid).last ensure reap = lambda{|cid| 'no-op' } end end end |
#gen_psname(obj) ⇒ Object
generate a default name to appear in ps/top
537 538 539 |
# File 'lib/slave.rb', line 537 def gen_psname obj "slave_#{ obj.class }_#{ obj.object_id }_#{ Process.ppid }_#{ Process.pid }".downcase.gsub(%r/\s+/,'_') end |
#getopts(opts) ⇒ Object
see docs for Slave.getopts
549 550 551 |
# File 'lib/slave.rb', line 549 def getopts opts self.class.getopts opts end |
#shutdown(opts = {}) ⇒ Object
cuts the lifeline and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown
520 521 522 523 524 525 526 527 |
# File 'lib/slave.rb', line 520 def shutdown opts = {} quiet = getopts(opts)['quiet'] raise "already shutdown" if @shutdown unless quiet begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end begin; @lifeline.cut; rescue Exception; end raise e if e unless quiet @shutdown = true end |
#shutdown? ⇒ Boolean
true
531 532 533 |
# File 'lib/slave.rb', line 531 def shutdown? @shutdown end |
#trace ⇒ Object
debugging output - ENV=1 to enable
555 556 557 558 559 560 |
# File 'lib/slave.rb', line 555 def trace if @debug STDERR.puts yield STDERR.flush end end |
#wait(opts = {}, &b) ⇒ Object Also known as: wait2
wait for slave to finish. if the keyword ‘non_block’=>true is given a thread is returned to do the waiting in an async fashion. eg
thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}
510 511 512 513 514 |
# File 'lib/slave.rb', line 510 def wait opts = {}, &b b ||= lambda{|exit_status|} non_block = getopts(opts)['non_block'] non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ] end |