Class: Riser::SocketProcessDispatcher
- Inherits:
-
Object
- Object
- Riser::SocketProcessDispatcher
- Includes:
- ServerSignal
- Defined in:
- lib/riser/server.rb
Constant Summary collapse
- NO_CALL =
:nodoc:
proc{}
- SEND_CMD =
:nodoc:
"SEND\n".freeze
- SEND_LEN =
:nodoc:
SEND_CMD.bytesize
- RADY_CMD =
:nodoc:
"RADY\n".freeze
- RADY_LEN =
:nodoc:
RADY_CMD.bytesize
Constants included from ServerSignal
Riser::ServerSignal::SIGNAL_RESTART_FORCED, Riser::ServerSignal::SIGNAL_RESTART_GRACEFUL, Riser::ServerSignal::SIGNAL_STAT_GET_AND_RESET, Riser::ServerSignal::SIGNAL_STAT_GET_NO_RESET, Riser::ServerSignal::SIGNAL_STAT_STOP, Riser::ServerSignal::SIGNAL_STOP_FORCED, Riser::ServerSignal::SIGNAL_STOP_GRACEFUL
Instance Attribute Summary collapse
-
#accept_polling_timeout_seconds ⇒ Object
Returns the value of attribute accept_polling_timeout_seconds.
-
#process_num ⇒ Object
Returns the value of attribute process_num.
-
#process_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_queue_polling_timeout_seconds.
-
#process_queue_size ⇒ Object
Returns the value of attribute process_queue_size.
-
#process_send_io_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_send_io_polling_timeout_seconds.
-
#thread_num ⇒ Object
Returns the value of attribute thread_num.
-
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
-
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
Instance Method Summary collapse
-
#at_fork(&block) ⇒ Object
:yields:.
-
#at_stat(&block) ⇒ Object
:yields: stat_info.
-
#at_stop(&block) ⇒ Object
:yields: stop_state.
-
#dispatch(&block) ⇒ Object
:yields: accept_object.
-
#initialize(process_queue_name, thread_queue_name) ⇒ SocketProcessDispatcher
constructor
A new instance of SocketProcessDispatcher.
-
#postprocess(&block) ⇒ Object
:yields:.
-
#preprocess(&block) ⇒ Object
:yields:.
-
#setup ⇒ Object
after this method call is completed, the object will be ready to accept ‘signal_…’ methods.
-
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler.
-
#signal_stat_stop ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_forced ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler.
- #start(server_socket) ⇒ Object
Constructor Details
#initialize(process_queue_name, thread_queue_name) ⇒ SocketProcessDispatcher
Returns a new instance of SocketProcessDispatcher.
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 |
# File 'lib/riser/server.rb', line 440 def initialize(process_queue_name, thread_queue_name) @accept_polling_timeout_seconds = nil @process_num = nil @process_queue_name = process_queue_name @process_queue_size = nil @process_queue_polling_timeout_seconds = nil @process_send_io_polling_timeout_seconds = nil @thread_num = nil @thread_queue_name = thread_queue_name @thread_queue_size = nil @thread_queue_polling_timeout_seconds = nil @at_fork= nil @at_stop = nil @at_stat = nil @preprocess = nil @postprocess = nil @dispatch = nil @process_dispatcher = nil end |
Instance Attribute Details
#accept_polling_timeout_seconds ⇒ Object
Returns the value of attribute accept_polling_timeout_seconds.
460 461 462 |
# File 'lib/riser/server.rb', line 460 def accept_polling_timeout_seconds @accept_polling_timeout_seconds end |
#process_num ⇒ Object
Returns the value of attribute process_num.
461 462 463 |
# File 'lib/riser/server.rb', line 461 def process_num @process_num end |
#process_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_queue_polling_timeout_seconds.
463 464 465 |
# File 'lib/riser/server.rb', line 463 def process_queue_polling_timeout_seconds @process_queue_polling_timeout_seconds end |
#process_queue_size ⇒ Object
Returns the value of attribute process_queue_size.
462 463 464 |
# File 'lib/riser/server.rb', line 462 def process_queue_size @process_queue_size end |
#process_send_io_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_send_io_polling_timeout_seconds.
464 465 466 |
# File 'lib/riser/server.rb', line 464 def process_send_io_polling_timeout_seconds @process_send_io_polling_timeout_seconds end |
#thread_num ⇒ Object
Returns the value of attribute thread_num.
465 466 467 |
# File 'lib/riser/server.rb', line 465 def thread_num @thread_num end |
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
467 468 469 |
# File 'lib/riser/server.rb', line 467 def thread_queue_polling_timeout_seconds @thread_queue_polling_timeout_seconds end |
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
466 467 468 |
# File 'lib/riser/server.rb', line 466 def thread_queue_size @thread_queue_size end |
Instance Method Details
#at_fork(&block) ⇒ Object
:yields:
469 470 471 472 |
# File 'lib/riser/server.rb', line 469 def at_fork(&block) # :yields: @at_fork = block nil end |
#at_stat(&block) ⇒ Object
:yields: stat_info
479 480 481 482 |
# File 'lib/riser/server.rb', line 479 def at_stat(&block) # :yields: stat_info @at_stat = block nil end |
#at_stop(&block) ⇒ Object
:yields: stop_state
474 475 476 477 |
# File 'lib/riser/server.rb', line 474 def at_stop(&block) # :yields: stop_state @at_stop = block nil end |
#dispatch(&block) ⇒ Object
:yields: accept_object
494 495 496 497 |
# File 'lib/riser/server.rb', line 494 def dispatch(&block) # :yields: accept_object @dispatch = block nil end |
#postprocess(&block) ⇒ Object
:yields:
489 490 491 492 |
# File 'lib/riser/server.rb', line 489 def postprocess(&block) # :yields: @postprocess = block nil end |
#preprocess(&block) ⇒ Object
:yields:
484 485 486 487 |
# File 'lib/riser/server.rb', line 484 def preprocess(&block) # :yields: @preprocess = block nil end |
#setup ⇒ Object
after this method call is completed, the object will be ready to accept ‘signal_…’ methods.
525 526 527 528 |
# File 'lib/riser/server.rb', line 525 def setup @process_dispatcher = SocketThreadDispatcher.new(@process_queue_name) nil end |
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler
512 513 514 515 |
# File 'lib/riser/server.rb', line 512 def signal_stat_get(reset: true) @process_dispatcher.signal_stat_get(reset: reset) if @process_dispatcher nil end |
#signal_stat_stop ⇒ Object
should be called from signal(2) handler
518 519 520 521 |
# File 'lib/riser/server.rb', line 518 def signal_stat_stop @process_dispatcher.signal_stat_stop if @process_dispatcher nil end |
#signal_stop_forced ⇒ Object
should be called from signal(2) handler
506 507 508 509 |
# File 'lib/riser/server.rb', line 506 def signal_stop_forced @process_dispatcher.signal_stop_forced if @process_dispatcher nil end |
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler
500 501 502 503 |
# File 'lib/riser/server.rb', line 500 def signal_stop_graceful @process_dispatcher.signal_stop_graceful if @process_dispatcher nil end |
#start(server_socket) ⇒ Object
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/riser/server.rb', line 535 def start(server_socket) case (server_socket) when TCPServer, UNIXServer socket_class = server_socket.class.superclass else socket_class = IO end parent_latch_file = Tempfile.open('riser_latch_') child_latch_file = File.open(parent_latch_file.path, File::RDWR) child_latch_file.flock(File::LOCK_EX | File::LOCK_NB) or raise "internal error: failed to lock latch file: #{parent_latch_file.path}" parent_latch_file.unlink process_list = [] @process_num.times do |pos| child_io, parent_io = UNIXSocket.socketpair pid = Process.fork{ server_socket.close parent_latch_file.close parent_io.close pos.times do |i| process_list[i].io.close end thread_dispatcher = SocketThreadDispatcher.new("#{@thread_queue_name}-#{pos}") thread_dispatcher.thread_num = @thread_num thread_dispatcher.thread_queue_size = @thread_queue_size thread_dispatcher.thread_queue_polling_timeout_seconds = @thread_queue_polling_timeout_seconds thread_dispatcher.at_stop(&@at_stop) thread_dispatcher.at_stat(&@at_stat) thread_dispatcher.at_stat_get(&NO_CALL) thread_dispatcher.at_stat_stop(&NO_CALL) thread_dispatcher.preprocess(&@preprocess) thread_dispatcher.postprocess(&@postprocess) thread_dispatcher.accept{ if (child_io.wait_readable(@process_send_io_polling_timeout_seconds) != nil) then command = child_io.read(SEND_LEN) command == SEND_CMD or raise "internal error: unknown command <#{command.inspect}>" child_io.recv_io(socket_class) end } thread_dispatcher.accept_return{ child_io.write(RADY_CMD) } thread_dispatcher.dispatch(&@dispatch) thread_dispatcher.dispose(&NO_CALL) Signal.trap(SIGNAL_STOP_GRACEFUL) { thread_dispatcher.signal_stop_graceful } Signal.trap(SIGNAL_STOP_FORCED) { thread_dispatcher.signal_stop_forced } Signal.trap(SIGNAL_STAT_GET_AND_RESET) { thread_dispatcher.signal_stat_get(reset: true) } Signal.trap(SIGNAL_STAT_GET_NO_RESET) { thread_dispatcher.signal_stat_get(reset: false) } Signal.trap(SIGNAL_STAT_STOP) { thread_dispatcher.signal_stat_stop } # release flock(2) child_latch_file.close begin @at_fork.call thread_dispatcher.start ensure child_io.close end } child_io.close process_list << SocketProcess.new(pid, parent_io) end child_latch_file.close parent_latch_file.flock(File::LOCK_EX) # wait to release flock(2) at child processes parent_latch_file.close setup unless @process_dispatcher @process_dispatcher.thread_num = @process_num @process_dispatcher.thread_queue_size = @process_queue_size @process_dispatcher.thread_queue_polling_timeout_seconds = @process_queue_polling_timeout_seconds @process_dispatcher.at_stop{|state| case (state) when :graceful for process in process_list Process.kill(SIGNAL_STOP_GRACEFUL, process.pid) end when :forced for process in process_list Process.kill(SIGNAL_STOP_FORCED, process.pid) end end } @process_dispatcher.at_stat(&@at_stat) @process_dispatcher.at_stat_get{|reset| if (reset) then for process in process_list Process.kill(SIGNAL_STAT_GET_AND_RESET, process.pid) end else for process in process_list Process.kill(SIGNAL_STAT_GET_NO_RESET, process.pid) end end } @process_dispatcher.at_stat_stop{ for process in process_list Process.kill(SIGNAL_STAT_STOP, process.pid) end } @process_dispatcher.preprocess(&NO_CALL) @process_dispatcher.postprocess(&NO_CALL) @process_dispatcher.accept{ server_socket.accept_timeout(@accept_polling_timeout_seconds) } @process_dispatcher.accept_return(&NO_CALL) @process_dispatcher.dispatch{|socket| process = process_list[Thread.current[:number]] process.io.write(SEND_CMD) process.io.send_io(socket) response = process.io.read(RADY_LEN) response == RADY_CMD or raise "internal error: unknown response <#{response.inspect}>" } @process_dispatcher.dispose{ process = process_list[Thread.current[:number]] Process.wait(process.pid) process.io.close } @process_dispatcher.start nil end |