Class: Slave

Inherits:
Object
  • Object
show all
Defined in:
lib/slave.rb,
lib/slave-1.2.1.rb

Overview

the Slave class encapsulates the work of setting up a drb server in another process running on localhost via unix domain sockets. the slave process is attached to it’s parent via a LifeLine which is designed such that the slave cannot out-live it’s parent and become a zombie, even if the parent dies and early death, such as by ‘kill -9’. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg

class Server
  def add_two n
    n + 2
  end
end

slave = Slave.new 'object' => Server.new
server = slave.object

p server.add_two(40) #=> 42

two other methods of providing server objects exist:

a) server = Server.new “this is called the parent” }

Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}

b) Slave.new{ Server.new “this is called only in the child” }

of the two ‘b’ is preferred.

Defined Under Namespace

Classes: LifeLine, ThreadSafe, ThreadSafeHash

Constant Summary collapse

VERSION =

–{{{

'1.2.1'
DEFAULT_SOCKET_CREATION_ATTEMPTS =

env config

Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42)
DEFAULT_DEBUG =
(ENV['SLAVE_DEBUG'] ? true : false)
DEFAULT_THREADSAFE =
(ENV['SLAVE_THREADSAFE'] ? true : false)

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, &block) ⇒ Slave

sets up a child process serving any object as a DRb server running locally on unix domain sockets. the child process has a LifeLine established between it and the parent, making it impossible for the child to outlive the parent (become a zombie). the object to serve is specfied either directly using the ‘object’/:object keyword

Slave.new :object => MyServer.new

or, preferably, using the block form

Slave.new{ MyServer.new }

when the block form is used the object is contructed in the child process itself. this is quite advantageous if the child object consumes resources or opens file handles (db connections, etc). by contructing the object in the child any resources are consumed from the child’s address space and things like open file handles will not be carried into subsequent child processes (via standard unix fork semantics). in the event that a block is specified but the object cannot be constructed and, instead, throws and Exception, that exception will be propogated to the parent process.

opts may contain the following keys, as either strings or symbols

object : specify the slave object.  otherwise block value is used.
socket_creation_attempts : specify how many attempts to create a unix domain socket will be made 
debug : turn on some logging to STDERR
psname : specify the name that will appear in 'top' ($0)
at_exit : specify a lambda to be called in the *parent* when the child dies
dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped) 
threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety

Raises:

  • (ArgumentError)


321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/slave.rb', line 321

def initialize opts = {}, &block
  getopt = getopts opts

  @obj = getopt['object']
  @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts')
  @debug = getopt['debug'] || default('debug')
  @psname = getopt['psname']
  @at_exit = getopt['at_exit']
  @dumped = getopt['dumped']
  @threadsafe = getopt['threadsafe'] || default('threadsafe')

  raise ArgumentError, 'no slave object or slave object block provided!' if 
    @obj.nil? and block.nil?

  @shutdown = false
  @waiter = @status = nil
  @lifeline = LifeLine.new

  # weird syntax because dot/rdoc chokes on this!?!?
  init_failure = lambda do |e|
    trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
    o = Object.new
    class << o
      attr_accessor '__slave_object_failure__'
    end
    o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace]
    @object = o
  end

 
# child
#
  unless((@pid = Slave::fork))
    e = nil
    begin
      Kernel.at_exit{ Kernel.exit! }
      @lifeline.catch

      if @obj
        @object = @obj
      else
        begin
          @object = block.call 
        rescue Exception => e
          init_failure[e]
        end
      end

      if block and @obj
        begin
          block[@obj]
        rescue Exception => e
          init_failure[e]
        end
      end

      $0 = (@psname ||= gen_psname(@object))

      unless @dumped or @object.respond_to?('__slave_object_failure__')
        @object.extend DRbUndumped
      end

      if @threadsafe
        @object = ThreadSafe.new @object
      end

      @ppid, @pid = Process.ppid, Process.pid
      @socket = nil
      @uri = nil

      tmpdir = test(?d, '/tmp') ? '/tmp' : Dir.tmpdir
      basename = File.basename(@psname)

      @socket_creation_attempts.times do |attempt|
        se = nil
        begin
          s =
            if attempt > 0
              File.join(tmpdir, "#{ basename }_#{ attempt }")
            else
              File.join(tmpdir, "#{ basename }")
            end
          raise("#{ s } is too long!") if s.size > 103
          u = "drbunix://#{ s }"
          DRb::start_service u, @object 
          @socket = s
          @uri = u
          trace{ "child - socket <#{ @socket }>" }
          trace{ "child - uri <#{ @uri }>" }
          break
        rescue Errno::EADDRINUSE => se
          nil
        end
      end

      if @socket and @uri
        trap('SIGUSR2') do
          DBb::thread.kill rescue nil
          FileUtils::rm_f @socket rescue nil
          exit
        end

        @lifeline.puts(@socket)
        @lifeline.cling
      else
        @lifeline.release
        warn "slave(#{ $$ }) could not create socket!"
        exit
      end
    rescue Exception => e
      trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
    ensure
      status = e.respond_to?('status') ? e.status : 1
      exit(status)
    end
 
# parent 
#
  else
    detach
    @lifeline.throw

    buf = @lifeline.gets
    raise "failed to find slave socket" if buf.nil? or buf.strip.empty?
    @socket = buf.strip
    trace{ "parent - socket <#{ @socket }>" }

    if @at_exit
      @at_exit_thread = @lifeline.on_cut{ 
        @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self)
      }
    end

    if @socket and File::exist? @socket
      Kernel.at_exit{ FileUtils::rm_f @socket }
      @uri = "drbunix://#{ socket }"
      trace{ "parent - uri <#{ @uri }>" }
     
    # starting drb on localhost avoids dns lookups!
    #
      DRb::start_service('druby://0.0.0.0:0', nil) unless DRb::thread
      @object = DRbObject::new nil, @uri
      if @object.respond_to? '__slave_object_failure__'
        c, m, bt = Marshal.load @object.__slave_object_failure__
        (e = c.new(m)).set_backtrace bt
        trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
        raise e 
      end
      @psname ||= gen_psname(@object)
    else
      raise "failed to find slave socket <#{ @socket }>"
    end
  end
end

Class Attribute Details

.debugObject

if this is true and you are running from a terminal information is printed on STDERR



72
73
74
# File 'lib/slave.rb', line 72

def debug
  @debug
end

.socket_creation_attemptsObject

defineds how many attempts will be made to create a temporary unix domain socket



67
68
69
# File 'lib/slave.rb', line 67

def socket_creation_attempts
  @socket_creation_attempts
end

.threadsafeObject

if this is true all slave objects will be wrapped such that any call to the object is threadsafe. if you do not use this you must ensure that your objects are threadsafe __yourself__ as this is required of any object acting as a drb server



79
80
81
# File 'lib/slave.rb', line 79

def threadsafe
  @threadsafe
end

Instance Attribute Details

#at_exitObject (readonly)

Returns the value of attribute at_exit.



280
281
282
# File 'lib/slave.rb', line 280

def at_exit
  @at_exit
end

#debugObject (readonly)

Returns the value of attribute debug.



278
279
280
# File 'lib/slave.rb', line 278

def debug
  @debug
end

#dumpedObject (readonly)

Returns the value of attribute dumped.



281
282
283
# File 'lib/slave.rb', line 281

def dumped
  @dumped
end

#objObject (readonly)

attrs



276
277
278
# File 'lib/slave.rb', line 276

def obj
  @obj
end

#objectObject (readonly)

Returns the value of attribute object.



284
285
286
# File 'lib/slave.rb', line 284

def object
  @object
end

#pidObject (readonly)

Returns the value of attribute pid.



285
286
287
# File 'lib/slave.rb', line 285

def pid
  @pid
end

#ppidObject (readonly)

Returns the value of attribute ppid.



286
287
288
# File 'lib/slave.rb', line 286

def ppid
  @ppid
end

#psnameObject (readonly)

Returns the value of attribute psname.



279
280
281
# File 'lib/slave.rb', line 279

def psname
  @psname
end

#socketObject (readonly)

Returns the value of attribute socket.



288
289
290
# File 'lib/slave.rb', line 288

def socket
  @socket
end

#socket_creation_attemptsObject (readonly)

Returns the value of attribute socket_creation_attempts.



277
278
279
# File 'lib/slave.rb', line 277

def socket_creation_attempts
  @socket_creation_attempts
end

#statusObject (readonly)

Returns the value of attribute status.



283
284
285
# File 'lib/slave.rb', line 283

def status
  @status
end

#uriObject (readonly)

Returns the value of attribute uri.



287
288
289
# File 'lib/slave.rb', line 287

def uri
  @uri
end

Class Method Details

.default(key) ⇒ Object

get a default value



83
84
85
# File 'lib/slave.rb', line 83

def default(key)
  send key
end

.descriptionObject



45
46
47
# File 'lib/slave.rb', line 45

def Slave.description
  'easily start a drb server in another process'
end

.fork(&b) ⇒ Object

just fork with out silly warnings



101
102
103
104
105
106
107
108
109
# File 'lib/slave.rb', line 101

def fork(&b)
  v = $VERBOSE
  begin
    $VERBOSE = nil
    Process::fork(&b)
  ensure
  $VERBOSE = v
  end
end

.getopts(opts) ⇒ Object

Raises:

  • (ArgumentError)


87
88
89
90
91
92
93
94
95
96
97
# File 'lib/slave.rb', line 87

def getopts(opts)
  raise ArgumentError, opts.class unless
    opts.respond_to?('has_key?') and opts.respond_to?('[]')

  lambda do |key, *defval|
    defval = defval.shift
    keys = [key, key.to_s, key.to_s.intern]
    key = keys.detect{|k| opts.has_key? k } and break opts[key]
    defval
  end
end

.object(opts = {}, &b) ⇒ Object

a simple convenience method which returns an object from another process. the object returned is the result of the supplied block. eg

object = Slave.object{ processor_intensive_object_built_in_child_process() }

eg.

the call can be made asynchronous via the ‘async’/:async keyword

thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() }

# go on about your coding business then, later

object = thread.value


578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
# File 'lib/slave.rb', line 578

def self.object opts = {}, &b
  async = opts.delete('async') || opts.delete(:async) 

  opts['object'] = opts[:object] = lambda(&b)
  opts['dumped'] = opts[:dumped] = true 

  slave = Slave.new opts

  value = lambda do |slave|
    begin
      slave.object.call
    ensure
      slave.shutdown
    end
  end

  async ? Thread.new{ value[slave] } : value[slave] 
end

.versionObject



43
# File 'lib/slave.rb', line 43

def self.version() VERSION end

Instance Method Details

#default(key) ⇒ Object

see docs for Slave.default



543
544
545
# File 'lib/slave.rb', line 543

def default key
  self.class.default key
end

#detachObject

starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status



480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
# File 'lib/slave.rb', line 480

def detach
  reap = lambda do |cid|
    begin
      @status = Process::waitpid2(cid).last
    rescue Exception => e 
      m, c, b = e.message, e.class, e.backtrace.join("\n")
      warn "#{ m } (#{ c })\n#{ b }"  unless e.is_a? Errno::ECHILD
    end
  end

  Kernel.at_exit do
    shutdown rescue nil
    reap[@pid] rescue nil
  end

  @waiter = 
    Thread.new do
      begin
        @status = Process::waitpid2(@pid).last
      ensure
        reap = lambda{|cid| 'no-op' }
      end
    end
end

#gen_psname(obj) ⇒ Object

generate a default name to appear in ps/top



537
538
539
# File 'lib/slave.rb', line 537

def gen_psname obj
  "slave_#{ obj.class }_#{ obj.object_id }_#{ Process.ppid }_#{ Process.pid }".downcase.gsub(%r/\s+/,'_')
end

#getopts(opts) ⇒ Object

see docs for Slave.getopts



549
550
551
# File 'lib/slave.rb', line 549

def getopts opts 
  self.class.getopts opts 
end

#shutdown(opts = {}) ⇒ Object

cuts the lifeline and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown



520
521
522
523
524
525
526
527
# File 'lib/slave.rb', line 520

def shutdown opts = {}
  quiet = getopts(opts)['quiet']
  raise "already shutdown" if @shutdown unless quiet
  begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end
  begin; @lifeline.cut; rescue Exception; end
  raise e if e unless quiet
  @shutdown = true
end

#shutdown?Boolean

true

Returns:

  • (Boolean)


531
532
533
# File 'lib/slave.rb', line 531

def shutdown?
  @shutdown
end

#traceObject

debugging output - ENV=1 to enable



555
556
557
558
559
560
# File 'lib/slave.rb', line 555

def trace
  if @debug 
    STDERR.puts yield
    STDERR.flush
  end
end

#wait(opts = {}, &b) ⇒ Object Also known as: wait2

wait for slave to finish. if the keyword ‘non_block’=>true is given a thread is returned to do the waiting in an async fashion. eg

thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}


510
511
512
513
514
# File 'lib/slave.rb', line 510

def wait opts = {}, &b
  b ||= lambda{|exit_status|}
  non_block = getopts(opts)['non_block']
  non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ]
end