Class: Levdon::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/levdon.rb

Overview

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ Worker

Returns a new instance of Worker.



134
135
136
137
138
139
# File 'lib/levdon.rb', line 134

def initialize(&block)
  @child_read, @parent_write = create_pipe
  @parent_read, @child_write = create_pipe
  @block = block
  @io_stream = NonBlockLineStream.new(@parent_read,@parent_write)
end

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



132
133
134
# File 'lib/levdon.rb', line 132

def pid
  @pid
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


179
180
181
182
183
184
# File 'lib/levdon.rb', line 179

def alive?
  Process.kill(0, @pid)
  true
rescue Errno::ESRCH
  false
end

#async_execute(*msg) ⇒ Object



169
170
171
# File 'lib/levdon.rb', line 169

def async_execute(*msg)
  nonblock_write_to_child(msg)
end

#create_pipeObject



141
142
143
# File 'lib/levdon.rb', line 141

def create_pipe
  IO.pipe.map{|pipe| pipe.tap{|_| _.set_encoding("ASCII-8BIT", "ASCII-8BIT") } }
end

#execute(*msg) ⇒ Object



164
165
166
167
# File 'lib/levdon.rb', line 164

def execute(*msg)
  write_to_child(msg)
  Thread.new { read_from_child }
end

#install_exit_handlerObject



243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/levdon.rb', line 243

def install_exit_handler
  at_exit do
    next unless alive?
    begin
      Process.kill("KILL", @pid)
      Process.wait(@pid)
    rescue Errno::ESRCH
      # noop
    rescue => e
      puts "error at_exit: #{ e }"
      raise e
    end
  end
end

#install_signal_handlerObject



258
259
260
261
262
263
264
265
266
# File 'lib/levdon.rb', line 258

def install_signal_handler
  [:INT, :QUIT].each do |signal|
    old_handler = Signal.trap(signal) {
      Process.kill(signal, @pid)
      Process.wait(@pid)
      old_handler.call
    }
  end
end

#nonblock_read_from_childObject



200
201
202
203
204
205
206
# File 'lib/levdon.rb', line 200

def nonblock_read_from_child()
  data = @io_stream.read
  if(data)
    return Marshal.load(data.chomp.gsub("@NDELIMITER@", "\n"))
  end
  return nil
end

#nonblock_write_to_child(obj) ⇒ Object



208
209
210
211
# File 'lib/levdon.rb', line 208

def nonblock_write_to_child(obj)
  data = Marshal.dump(obj).gsub("\n", "@NDELIMITER@") + "\n"
  @io_stream.write(data)
end

#pollObject



186
187
188
# File 'lib/levdon.rb', line 186

def poll
  @io_stream.poll
end

#read_from_childObject



213
214
215
# File 'lib/levdon.rb', line 213

def read_from_child
  read_object(@parent_read)
end

#read_from_parentObject



221
222
223
# File 'lib/levdon.rb', line 221

def read_from_parent
  read_object(@child_read)
end

#read_object(read) ⇒ Object



195
196
197
198
# File 'lib/levdon.rb', line 195

def read_object(read)
  data = read.gets
  Marshal.load(data.chomp.gsub("@NDELIMITER@", "\n"))
end

#runObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/levdon.rb', line 145

def run
  @pid = fork do
    @parent_read.close
    @parent_write.close
    write_to_parent(:ready)
    loop do
      args = read_from_parent
      break if args == :stop 
      result = @block.call(*args)
      write_object(result, @child_write)
    end

    @child_read.close
    @child_write.close
  end

  wait_after_fork if @pid
end

#stopObject



173
174
175
176
177
# File 'lib/levdon.rb', line 173

def stop
  return unless alive?
  write_to_child(:stop)
  Process.wait(@pid)
end

#wait_after_forkObject



229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/levdon.rb', line 229

def wait_after_fork
  @child_read.close
  @child_write.close

  install_exit_handler
  install_signal_handler
  
  Thread.new {
    result = read_from_child
    raise "Failed to start worker pid #{ @pid }" unless result == :ready
    result
  }
end

#write_object(obj, write) ⇒ Object



190
191
192
193
# File 'lib/levdon.rb', line 190

def write_object(obj, write)
  data = Marshal.dump(obj).gsub("\n", "@NDELIMITER@") + "\n"
  write.write data
end

#write_to_child(obj) ⇒ Object



217
218
219
# File 'lib/levdon.rb', line 217

def write_to_child(obj)
  write_object(obj, @parent_write)
end

#write_to_parent(obj) ⇒ Object



225
226
227
# File 'lib/levdon.rb', line 225

def write_to_parent(obj)
  write_object(obj, @child_write)
end