Class: Spawner

Inherits:
Object
  • Object
show all
Defined in:
lib/webby/stelan/spawner.rb

Overview

Synopsis

A class for spawning child processes and ensuring those children continue running.

Details

When a spawner is created it is given the command to run in a child process. This child process has stdin, stdout, and stderr redirected to /dev/null (this works even on Windows). When the child dies for any reason, the spawner will restart a new child process in the exact same manner as the original.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Spawner

call-seq:

Spawner.new( command, *args, opts = {} )

Creates a new spawner that will execute the given external command in a sub-process. The calling semantics of Kernel::exec are used to execute the command. Any number of optional args can be passed to the command.

Available options:

:spawn   => the number of child processes to spawn
:pause   => wait time (in seconds) before respawning after termination
:ruby    => the Ruby interpreter to use when spawning children
:env     => a hash for the child process environment
:stdin   => stdin child processes will read from
:stdout  => stdout child processes will write to
:stderr  => stderr child processes will write to

The :env option is used to add environemnt variables to child processes when they are spawned.

Note: all spawned child processes will use the same stdin, stdout, and stderr if they are given in the options. Otherwise they all default to /dev/null on *NIX and NUL: on Windows.

Raises:

  • (ArgumentError)


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/webby/stelan/spawner.rb', line 69

def initialize( *args )
  config = {
    :ruby => self.class.ruby,
    :spawn => 1,
    :pause => 0,
    :stdin => self.class.dev_null,
    :stdout => self.class.dev_null,
    :stderr => self.class.dev_null
  }
  config.merge! args.pop if Hash === args.last
  config[:argv] = args

  raise ArgumentError, 'wrong number of arguments' if args.empty?

  @stop = true
  @cids = []
  @group = ThreadGroup.new

  @spawn = config.delete(:spawn)
  @pause = config.delete(:pause)
  @ruby = config.delete(:ruby)

  @tmp = child_program(config)

  class << @cids
    # call-seq:
    #    sync {block}
    #
    # Executes the given block in a synchronized fashion -- i.e. only a
    # single thread can execute at a time. Uses Mutex under the hood.
    #
    def sync(&b) 
      @mutex ||= Mutex.new
      @mutex.synchronize(&b)
    end

    # call-seq:
    #    kill( signal, num )     => number killed
    #    kill( signal, :all )    => number killed
    #
    # Send the _signal_ to a given _num_ of child processes or all child
    # processes if <code>:all</code> is given instead of a number. Returns
    # the number of child processes killed.
    #
    def kill( signal, arg )
      return if empty?

      ary = sync do
              case arg
              when :all; self.dup
              when Integer; self.slice(0,arg)
              else raise ArgumentError end
            end

      ary.each do |cid|
        begin
          Process.kill(signal, cid)
        rescue SystemCallError
          sync {delete cid}
        end
      end
      ary.length
    end  # def kill
  end  # class << @cids

end

Class Attribute Details

.dev_nullObject (readonly)

Returns the value of attribute dev_null.



33
34
35
# File 'lib/webby/stelan/spawner.rb', line 33

def dev_null
  @dev_null
end

.rubyObject (readonly)

Returns the value of attribute ruby.



32
33
34
# File 'lib/webby/stelan/spawner.rb', line 32

def ruby
  @ruby
end

Instance Attribute Details

#pauseObject

Returns the value of attribute pause.



137
138
139
# File 'lib/webby/stelan/spawner.rb', line 137

def pause
  @pause
end

#spawnObject

def initialize



136
137
138
# File 'lib/webby/stelan/spawner.rb', line 136

def spawn
  @spawn
end

Class Method Details

.finalizer(cids) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/webby/stelan/spawner.rb', line 35

def finalizer( cids )
  pid = $$
  lambda do
    break unless pid == $$
    cids.kill 'TERM', :all
  end  # lambda
end

Instance Method Details

#join(limit = nil) ⇒ Object

call-seq:

join( timeout = nil )    => spawner or nil

The calling thread will suspend execution until all child processes have been stopped. Does not return until all spawner threads have exited (the child processes have been stopped) or until _timeout seconds have passed. If the timeout expires nil will be returned; otherwise the spawner is returned.



232
233
234
235
236
237
238
239
# File 'lib/webby/stelan/spawner.rb', line 232

def join( limit = nil )
  loop do
    t = @group.list.first
    break if t.nil?
    return nil unless t.join(limit)
  end
  self
end

#restart(timeout = 5) ⇒ Object

call-seq:

restart( timeout = 5 )


208
209
210
211
# File 'lib/webby/stelan/spawner.rb', line 208

def restart( timeout = 5 )
  stop( timeout )
  start
end

#running?Boolean

call-seq:

running?

Returns true if the spawner is currently running; returns false otherwise.

Returns:

  • (Boolean)


219
220
221
# File 'lib/webby/stelan/spawner.rb', line 219

def running?
  !@stop
end

#startObject

call-seq:

start    => self

Spawn the sub-processes.



162
163
164
165
166
167
168
169
170
171
# File 'lib/webby/stelan/spawner.rb', line 162

def start
  return self if running?
  @stop = false

  @cleanup = Spawner.finalizer(@cids)
  ObjectSpace.define_finalizer(self, @cleanup)

  @spawn.times {_spawn}
  self
end

#stop(timeout = 5) ⇒ Object

call-seq:

stop( timeout = 5 )    => self

Stop any spawned sub-processes.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/webby/stelan/spawner.rb', line 178

def stop( timeout = 5 )
  return self unless running?
  @stop = true

  @cleanup.call
  ObjectSpace.undefine_finalizer(self)

  # the cleanup call sends SIGTERM to all the child processes
  # however, some might still be hanging around, so we are going to wait
  # for a timeout interval and then send a SIGKILL to any remaining child
  # processes
  nap_time = 0.05 * timeout   # sleep for 5% of the timeout interval
  timeout = Time.now + timeout

  until @cids.empty?
    sleep nap_time
    unless Time.now < timeout
      @cids.kill 'KILL', :all
      @cids.clear
      @group.list.each {|t| t.kill}
      break
    end
  end

  self
end