Module: ConcurrentWorker::ConcurrentProcess
- Defined in:
- lib/concurrent_worker.rb
Defined Under Namespace
Classes: IPCDuplexChannel
Instance Method Summary
collapse
Instance Method Details
#cncr_block ⇒ Object
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
|
# File 'lib/concurrent_worker.rb', line 327
def cncr_block
@ipc_channel = IPCDuplexChannel.new
@c_pid = fork do
@ipc_channel.choose_io
begin
yield_base_block
rescue
@ipc_channel.send($!)
ensure
@ipc_channel.send(:worker_loop_finished)
end
end
@ipc_channel.choose_io
@rcv_thread = set_rcv_thread
end
|
#receive_req ⇒ Object
348
349
350
351
|
# File 'lib/concurrent_worker.rb', line 348
def receive_req
@ipc_channel.recv
end
|
#send_req(args) ⇒ Object
343
344
345
346
|
# File 'lib/concurrent_worker.rb', line 343
def send_req(args)
@ipc_channel.send(args)
end
|
#send_res(args) ⇒ Object
353
354
355
356
|
# File 'lib/concurrent_worker.rb', line 353
def send_res(args)
@ipc_channel.send(args)
end
|
#set_rcv_thread ⇒ Object
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
|
# File 'lib/concurrent_worker.rb', line 302
def set_rcv_thread
Thread.new do
Thread.handle_interrupt(Object => :never) do
begin
Thread.handle_interrupt(Object => :immediate) do
loop do
result = @ipc_channel.recv
break if result == :worker_loop_finished
raise result if result.kind_of?(Exception)
call_result_callbacks(result)
end
end
rescue
Thread.pass
raise $!
ensure
@req_counter.close
@ipc_channel.close
call_retired_callbacks
end
end
end
end
|
#wait_cncr_proc ⇒ Object
358
359
360
361
362
363
364
|
# File 'lib/concurrent_worker.rb', line 358
def wait_cncr_proc
begin
Process.waitpid(@c_pid)
rescue Errno::ECHILD
end
@rcv_thread && @rcv_thread.join
end
|