Module: ConcurrentWorker::ConcurrentProcess

Defined in:
lib/concurrent_worker/worker.rb

Instance Method Summary collapse

Instance Method Details

#channel_closeObject



275
276
277
# File 'lib/concurrent_worker/worker.rb', line 275

def channel_close
  @ipc_channel.close
end

#cncr_blockObject



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/concurrent_worker/worker.rb', line 238

def cncr_block
  @ipc_channel = IPCDuplexChannel.new
  @c_pid = fork do
    @ipc_channel.choose_io
    Thread.handle_interrupt(Object => :never) do
      begin
        Thread.handle_interrupt(Object => :immediate) do
          yield_base_block
        end
      rescue
        @ipc_channel.send($!)
      ensure
        @ipc_channel.send(nil)
      end
    end
  end
  @ipc_channel.choose_io
  @recv_thread = result_handle_thread do
    ipc_recv_loop
  end
end

#ipc_recv_loopObject



230
231
232
233
234
235
236
# File 'lib/concurrent_worker/worker.rb', line 230

def ipc_recv_loop
  while result = @ipc_channel.recv
    raise result if result.kind_of?(Exception)

    call_result_callbacks(result)
  end
end

#receive_reqObject



265
266
267
268
# File 'lib/concurrent_worker/worker.rb', line 265

def receive_req
  #called from worker process only
  @ipc_channel.recv
end

#send_req(args) ⇒ Object



260
261
262
263
# File 'lib/concurrent_worker/worker.rb', line 260

def send_req(args)
  #called from main process only
  @ipc_channel.send(args)
end

#send_res(args) ⇒ Object



270
271
272
273
# File 'lib/concurrent_worker/worker.rb', line 270

def send_res(args)
  #called from worker process only
  @ipc_channel.send(args)
end

#wait_cncr_procObject



279
280
281
282
283
284
285
# File 'lib/concurrent_worker/worker.rb', line 279

def wait_cncr_proc
  begin
    Process.waitpid(@c_pid)
  rescue Errno::ECHILD
  end
  @recv_thread && @recv_thread.join
end