Class: Riser::SocketProcessDispatcher

Inherits:
Object
  • Object
show all
Includes:
ServerSignal
Defined in:
lib/riser/server.rb

Constant Summary collapse

NO_CALL =

:nodoc:

proc{}

Constants included from ServerSignal

Riser::ServerSignal::SIGNAL_RESTART_FORCED, Riser::ServerSignal::SIGNAL_RESTART_GRACEFUL, Riser::ServerSignal::SIGNAL_STAT_GET_AND_RESET, Riser::ServerSignal::SIGNAL_STAT_GET_NO_RESET, Riser::ServerSignal::SIGNAL_STAT_STOP, Riser::ServerSignal::SIGNAL_STOP_FORCED, Riser::ServerSignal::SIGNAL_STOP_GRACEFUL

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(process_queue_name, thread_queue_name) ⇒ SocketProcessDispatcher

Returns a new instance of SocketProcessDispatcher.



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'lib/riser/server.rb', line 388

def initialize(process_queue_name, thread_queue_name)
  @accept_polling_timeout_seconds = nil
  @process_num = nil
  @process_queue_name = process_queue_name
  @process_queue_size = nil
  @process_queue_polling_timeout_seconds = nil
  @process_send_io_polling_timeout_seconds = nil
  @thread_num = nil
  @thread_queue_name = thread_queue_name
  @thread_queue_size = nil
  @thread_queue_polling_timeout_seconds = nil
  @at_fork= nil
  @at_stop = nil
  @at_stat = nil
  @preprocess = nil
  @postprocess = nil
  @dispatch = nil
  @process_dispatcher = nil
end

Instance Attribute Details

#accept_polling_timeout_secondsObject

Returns the value of attribute accept_polling_timeout_seconds.



408
409
410
# File 'lib/riser/server.rb', line 408

def accept_polling_timeout_seconds
  @accept_polling_timeout_seconds
end

#process_numObject

Returns the value of attribute process_num.



409
410
411
# File 'lib/riser/server.rb', line 409

def process_num
  @process_num
end

#process_queue_polling_timeout_secondsObject

Returns the value of attribute process_queue_polling_timeout_seconds.



411
412
413
# File 'lib/riser/server.rb', line 411

def process_queue_polling_timeout_seconds
  @process_queue_polling_timeout_seconds
end

#process_queue_sizeObject

Returns the value of attribute process_queue_size.



410
411
412
# File 'lib/riser/server.rb', line 410

def process_queue_size
  @process_queue_size
end

#process_send_io_polling_timeout_secondsObject

Returns the value of attribute process_send_io_polling_timeout_seconds.



412
413
414
# File 'lib/riser/server.rb', line 412

def process_send_io_polling_timeout_seconds
  @process_send_io_polling_timeout_seconds
end

#thread_numObject

Returns the value of attribute thread_num.



413
414
415
# File 'lib/riser/server.rb', line 413

def thread_num
  @thread_num
end

#thread_queue_polling_timeout_secondsObject

Returns the value of attribute thread_queue_polling_timeout_seconds.



415
416
417
# File 'lib/riser/server.rb', line 415

def thread_queue_polling_timeout_seconds
  @thread_queue_polling_timeout_seconds
end

#thread_queue_sizeObject

Returns the value of attribute thread_queue_size.



414
415
416
# File 'lib/riser/server.rb', line 414

def thread_queue_size
  @thread_queue_size
end

Instance Method Details

#at_fork(&block) ⇒ Object

:yields:



417
418
419
420
# File 'lib/riser/server.rb', line 417

def at_fork(&block)         # :yields:
  @at_fork = block
  nil
end

#at_stat(&block) ⇒ Object

:yields: stat_info



427
428
429
430
# File 'lib/riser/server.rb', line 427

def at_stat(&block)         # :yields: stat_info
  @at_stat = block
  nil
end

#at_stop(&block) ⇒ Object

:yields: stop_state



422
423
424
425
# File 'lib/riser/server.rb', line 422

def at_stop(&block)         # :yields: stop_state
  @at_stop = block
  nil
end

#dispatch(&block) ⇒ Object

:yields: accept_object



442
443
444
445
# File 'lib/riser/server.rb', line 442

def dispatch(&block)        # :yields: accept_object
  @dispatch = block
  nil
end

#postprocess(&block) ⇒ Object

:yields:



437
438
439
440
# File 'lib/riser/server.rb', line 437

def postprocess(&block)     # :yields:
  @postprocess = block
  nil
end

#preprocess(&block) ⇒ Object

:yields:



432
433
434
435
# File 'lib/riser/server.rb', line 432

def preprocess(&block)      # :yields:
  @preprocess = block
  nil
end

#setupObject

after this method call is completed, the object will be ready to accept ‘signal_…’ methods.



473
474
475
476
# File 'lib/riser/server.rb', line 473

def setup
  @process_dispatcher = SocketThreadDispatcher.new(@process_queue_name)
  nil
end

#signal_stat_get(reset: true) ⇒ Object

should be called from signal(2) handler



460
461
462
463
# File 'lib/riser/server.rb', line 460

def signal_stat_get(reset: true)
  @process_dispatcher.signal_stat_get(reset: reset) if @process_dispatcher
  nil
end

#signal_stat_stopObject

should be called from signal(2) handler



466
467
468
469
# File 'lib/riser/server.rb', line 466

def signal_stat_stop
  @process_dispatcher.signal_stat_stop if @process_dispatcher
  nil
end

#signal_stop_forcedObject

should be called from signal(2) handler



454
455
456
457
# File 'lib/riser/server.rb', line 454

def signal_stop_forced
  @process_dispatcher.signal_stop_forced if @process_dispatcher
  nil
end

#signal_stop_gracefulObject

should be called from signal(2) handler



448
449
450
451
# File 'lib/riser/server.rb', line 448

def signal_stop_graceful
  @process_dispatcher.signal_stop_graceful if @process_dispatcher
  nil
end

#start(server_socket) ⇒ Object



478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
# File 'lib/riser/server.rb', line 478

def start(server_socket)
  case (server_socket)
  when TCPServer, UNIXServer
    socket_class = server_socket.class.superclass
  else
    socket_class = IO
  end

  process_list = []
  @process_num.times do |pos|
    child_io, parent_io = UNIXSocket.socketpair
    pid = Process.fork{
      parent_io.close
      pos.times do |i|
        process_list[i].io.close
      end

      thread_dispatcher = SocketThreadDispatcher.new("#{@thread_queue_name}-#{pos}")
      thread_dispatcher.thread_num = @thread_num
      thread_dispatcher.thread_queue_size = @thread_queue_size
      thread_dispatcher.thread_queue_polling_timeout_seconds = @thread_queue_polling_timeout_seconds

      thread_dispatcher.at_stop(&@at_stop)
      thread_dispatcher.at_stat(&@at_stat)
      thread_dispatcher.at_stat_get(&NO_CALL)
      thread_dispatcher.at_stat_stop(&NO_CALL)
      thread_dispatcher.preprocess(&@preprocess)
      thread_dispatcher.postprocess(&@postprocess)

      thread_dispatcher.accept{
        if (child_io.wait_readable(@process_send_io_polling_timeout_seconds) != nil) then
          command = child_io.read(5)
          command == "SEND\n" or raise "internal error: unknown command <#{command.inspect}>"
          child_io.recv_io(socket_class)
        end
      }
      thread_dispatcher.accept_return{ child_io.write("RADY\n") }
      thread_dispatcher.dispatch(&@dispatch)

      Signal.trap(SIGNAL_STOP_GRACEFUL) { thread_dispatcher.signal_stop_graceful }
      Signal.trap(SIGNAL_STOP_FORCED) { thread_dispatcher.signal_stop_forced }
      Signal.trap(SIGNAL_STAT_GET_AND_RESET) { thread_dispatcher.signal_stat_get(reset: true) }
      Signal.trap(SIGNAL_STAT_GET_NO_RESET) { thread_dispatcher.signal_stat_get(reset: false) }
      Signal.trap(SIGNAL_STAT_STOP) { thread_dispatcher.signal_stat_stop }

      begin
        child_io.write("RADY\n")
        @at_fork.call
        thread_dispatcher.start
      ensure
        child_io.close
      end
    }
    child_io.close

    process_list << SocketProcess.new(pid, parent_io)
  end

  for process in process_list
    response = process.io.read(5)
    response == "RADY\n" or raise "internal error: unknown response <#{response.inspect}>"
  end

  setup unless @process_dispatcher
  @process_dispatcher.thread_num = @process_num
  @process_dispatcher.thread_queue_size = @process_queue_size
  @process_dispatcher.thread_queue_polling_timeout_seconds = @process_queue_polling_timeout_seconds

  @process_dispatcher.at_stop{|state|
    case (state)
    when :graceful
      for process in process_list
        Process.kill(SIGNAL_STOP_GRACEFUL, process.pid)
      end
    when :forced
      for process in process_list
        Process.kill(SIGNAL_STOP_FORCED, process.pid)
      end
    end
  }
  @process_dispatcher.at_stat(&@at_stat)
  @process_dispatcher.at_stat_get{|reset|
    if (reset) then
      for process in process_list
        Process.kill(SIGNAL_STAT_GET_AND_RESET, process.pid)
      end
    else
      for process in process_list
        Process.kill(SIGNAL_STAT_GET_NO_RESET, process.pid)
      end
    end
  }
  @process_dispatcher.at_stat_stop{
    for process in process_list
      Process.kill(SIGNAL_STAT_STOP, process.pid)
    end
  }
  @process_dispatcher.preprocess(&NO_CALL)
  @process_dispatcher.postprocess(&NO_CALL)

  @process_dispatcher.accept{
    if (server_socket.wait_readable(@accept_polling_timeout_seconds) != nil) then
      begin
        server_socket.accept_nonblock
      rescue IO::WaitReadable
        nil                 # to avoid conflicting accept(2) at server restart overlap
      end
    end
  }
  @process_dispatcher.accept_return(&NO_CALL)
  @process_dispatcher.dispatch{|socket|
    process = process_list[Thread.current[:number]]
    process.io.write("SEND\n")
    process.io.send_io(socket)
    response = process.io.read(5)
    response == "RADY\n" or raise "internal error: unknown response <#{response.inspect}>"
  }
  @process_dispatcher.start

  for process in process_list
    Process.wait(process.pid)
    process.io.close
  end

  nil
end