Module: ConcurrentWorker::ConcurrentProcess

Defined in:
lib/concurrent_worker/worker.rb

Instance Method Summary collapse

Instance Method Details

#channel_closeObject



283
284
285
# File 'lib/concurrent_worker/worker.rb', line 283

def channel_close
  @ipc_channel.close
end

#cncr_blockObject



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/concurrent_worker/worker.rb', line 246

def cncr_block
  @ipc_channel = IPCDuplexChannel.new
  @c_pid = fork do
    @ipc_channel.choose_io
    Thread.handle_interrupt(Object => :never) do
      begin
        Thread.handle_interrupt(Object => :immediate) do
          yield_base_block
        end
      rescue
        @ipc_channel.send($!)
      ensure
        @ipc_channel.send(nil)
      end
    end
  end
  @ipc_channel.choose_io
  @recv_thread = result_handle_thread do
    ipc_recv_loop
  end
end

#ipc_recv_loopObject



238
239
240
241
242
243
244
# File 'lib/concurrent_worker/worker.rb', line 238

def ipc_recv_loop
  while result = @ipc_channel.recv
    raise result if result.kind_of?(Exception)

    call_result_callbacks(result)
  end
end

#receive_reqObject



273
274
275
276
# File 'lib/concurrent_worker/worker.rb', line 273

def receive_req
  #called from worker process only
  @ipc_channel.recv
end

#send_req(args) ⇒ Object



268
269
270
271
# File 'lib/concurrent_worker/worker.rb', line 268

def send_req(args)
  #called from main process only
  @ipc_channel.send(args)
end

#send_res(args) ⇒ Object



278
279
280
281
# File 'lib/concurrent_worker/worker.rb', line 278

def send_res(args)
  #called from worker process only
  @ipc_channel.send(args)
end

#wait_cncr_procObject



287
288
289
290
291
292
293
# File 'lib/concurrent_worker/worker.rb', line 287

def wait_cncr_proc
  begin
    Process.waitpid(@c_pid)
  rescue Errno::ECHILD
  end
  @recv_thread && @recv_thread.join
end