Module: ConcurrentWorker::ConcurrentProcess

Defined in:
lib/concurrent_worker.rb

Defined Under Namespace

Classes: IPCDuplexChannel

Instance Method Summary collapse

Instance Method Details

#cncr_blockObject



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/concurrent_worker.rb', line 327

def cncr_block
  @ipc_channel = IPCDuplexChannel.new
  @c_pid = fork do
    @ipc_channel.choose_io
    begin
      yield_base_block
    rescue
      @ipc_channel.send($!)
    ensure
      @ipc_channel.send(:worker_loop_finished)
    end
  end
  @ipc_channel.choose_io
  @rcv_thread = set_rcv_thread
end

#receive_reqObject



348
349
350
351
# File 'lib/concurrent_worker.rb', line 348

def receive_req
  #called from worker process only
  @ipc_channel.recv
end

#send_req(args) ⇒ Object



343
344
345
346
# File 'lib/concurrent_worker.rb', line 343

def send_req(args)
  #called from main process only
  @ipc_channel.send(args)
end

#send_res(args) ⇒ Object



353
354
355
356
# File 'lib/concurrent_worker.rb', line 353

def send_res(args)
  #called from worker process only
  @ipc_channel.send(args)
end

#set_rcv_threadObject



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/concurrent_worker.rb', line 302

def set_rcv_thread
  Thread.new do
    Thread.handle_interrupt(Object => :never) do
      begin
        Thread.handle_interrupt(Object => :immediate) do
          loop do
            result = @ipc_channel.recv
            break if result == :worker_loop_finished
            raise result if result.kind_of?(Exception)

            call_result_callbacks(result)
          end
        end
      rescue
        Thread.pass
        raise $!
      ensure
        @req_counter.close
        @ipc_channel.close
        call_retired_callbacks
      end
    end
  end
end

#wait_cncr_procObject



358
359
360
361
362
363
364
# File 'lib/concurrent_worker.rb', line 358

def wait_cncr_proc
  begin
    Process.waitpid(@c_pid)
  rescue Errno::ECHILD
  end
  @rcv_thread && @rcv_thread.join
end