Class: Riser::SocketProcessDispatcher
- Inherits:
-
Object
- Object
- Riser::SocketProcessDispatcher
- Includes:
- ServerSignal
- Defined in:
- lib/riser/server.rb
Constant Summary collapse
- NO_CALL =
:nodoc:
proc{}
- SEND_CMD =
:nodoc:
"SEND\n".freeze
- SEND_LEN =
:nodoc:
SEND_CMD.length
- RADY_CMD =
:nodoc:
"RADY\n".freeze
- RADY_LEN =
:nodoc:
RADY_CMD.length
Constants included from ServerSignal
Riser::ServerSignal::SIGNAL_RESTART_FORCED, Riser::ServerSignal::SIGNAL_RESTART_GRACEFUL, Riser::ServerSignal::SIGNAL_STAT_GET_AND_RESET, Riser::ServerSignal::SIGNAL_STAT_GET_NO_RESET, Riser::ServerSignal::SIGNAL_STAT_STOP, Riser::ServerSignal::SIGNAL_STOP_FORCED, Riser::ServerSignal::SIGNAL_STOP_GRACEFUL
Instance Attribute Summary collapse
-
#accept_polling_timeout_seconds ⇒ Object
Returns the value of attribute accept_polling_timeout_seconds.
-
#process_num ⇒ Object
Returns the value of attribute process_num.
-
#process_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_queue_polling_timeout_seconds.
-
#process_queue_size ⇒ Object
Returns the value of attribute process_queue_size.
-
#process_send_io_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_send_io_polling_timeout_seconds.
-
#thread_num ⇒ Object
Returns the value of attribute thread_num.
-
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
-
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
Instance Method Summary collapse
-
#at_fork(&block) ⇒ Object
:yields:.
-
#at_stat(&block) ⇒ Object
:yields: stat_info.
-
#at_stop(&block) ⇒ Object
:yields: stop_state.
-
#dispatch(&block) ⇒ Object
:yields: accept_object.
-
#initialize(process_queue_name, thread_queue_name) ⇒ SocketProcessDispatcher
constructor
A new instance of SocketProcessDispatcher.
-
#postprocess(&block) ⇒ Object
:yields:.
-
#preprocess(&block) ⇒ Object
:yields:.
-
#setup ⇒ Object
after this method call is completed, the object will be ready to accept ‘signal_…’ methods.
-
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler.
-
#signal_stat_stop ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_forced ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler.
- #start(server_socket) ⇒ Object
Constructor Details
#initialize(process_queue_name, thread_queue_name) ⇒ SocketProcessDispatcher
Returns a new instance of SocketProcessDispatcher.
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
# File 'lib/riser/server.rb', line 416 def initialize(process_queue_name, thread_queue_name) @accept_polling_timeout_seconds = nil @process_num = nil @process_queue_name = process_queue_name @process_queue_size = nil @process_queue_polling_timeout_seconds = nil @process_send_io_polling_timeout_seconds = nil @thread_num = nil @thread_queue_name = thread_queue_name @thread_queue_size = nil @thread_queue_polling_timeout_seconds = nil @at_fork= nil @at_stop = nil @at_stat = nil @preprocess = nil @postprocess = nil @dispatch = nil @process_dispatcher = nil end |
Instance Attribute Details
#accept_polling_timeout_seconds ⇒ Object
Returns the value of attribute accept_polling_timeout_seconds.
436 437 438 |
# File 'lib/riser/server.rb', line 436 def accept_polling_timeout_seconds @accept_polling_timeout_seconds end |
#process_num ⇒ Object
Returns the value of attribute process_num.
437 438 439 |
# File 'lib/riser/server.rb', line 437 def process_num @process_num end |
#process_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_queue_polling_timeout_seconds.
439 440 441 |
# File 'lib/riser/server.rb', line 439 def process_queue_polling_timeout_seconds @process_queue_polling_timeout_seconds end |
#process_queue_size ⇒ Object
Returns the value of attribute process_queue_size.
438 439 440 |
# File 'lib/riser/server.rb', line 438 def process_queue_size @process_queue_size end |
#process_send_io_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_send_io_polling_timeout_seconds.
440 441 442 |
# File 'lib/riser/server.rb', line 440 def process_send_io_polling_timeout_seconds @process_send_io_polling_timeout_seconds end |
#thread_num ⇒ Object
Returns the value of attribute thread_num.
441 442 443 |
# File 'lib/riser/server.rb', line 441 def thread_num @thread_num end |
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
443 444 445 |
# File 'lib/riser/server.rb', line 443 def thread_queue_polling_timeout_seconds @thread_queue_polling_timeout_seconds end |
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
442 443 444 |
# File 'lib/riser/server.rb', line 442 def thread_queue_size @thread_queue_size end |
Instance Method Details
#at_fork(&block) ⇒ Object
:yields:
445 446 447 448 |
# File 'lib/riser/server.rb', line 445 def at_fork(&block) # :yields: @at_fork = block nil end |
#at_stat(&block) ⇒ Object
:yields: stat_info
455 456 457 458 |
# File 'lib/riser/server.rb', line 455 def at_stat(&block) # :yields: stat_info @at_stat = block nil end |
#at_stop(&block) ⇒ Object
:yields: stop_state
450 451 452 453 |
# File 'lib/riser/server.rb', line 450 def at_stop(&block) # :yields: stop_state @at_stop = block nil end |
#dispatch(&block) ⇒ Object
:yields: accept_object
470 471 472 473 |
# File 'lib/riser/server.rb', line 470 def dispatch(&block) # :yields: accept_object @dispatch = block nil end |
#postprocess(&block) ⇒ Object
:yields:
465 466 467 468 |
# File 'lib/riser/server.rb', line 465 def postprocess(&block) # :yields: @postprocess = block nil end |
#preprocess(&block) ⇒ Object
:yields:
460 461 462 463 |
# File 'lib/riser/server.rb', line 460 def preprocess(&block) # :yields: @preprocess = block nil end |
#setup ⇒ Object
after this method call is completed, the object will be ready to accept ‘signal_…’ methods.
501 502 503 504 |
# File 'lib/riser/server.rb', line 501 def setup @process_dispatcher = SocketThreadDispatcher.new(@process_queue_name) nil end |
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler
488 489 490 491 |
# File 'lib/riser/server.rb', line 488 def signal_stat_get(reset: true) @process_dispatcher.signal_stat_get(reset: reset) if @process_dispatcher nil end |
#signal_stat_stop ⇒ Object
should be called from signal(2) handler
494 495 496 497 |
# File 'lib/riser/server.rb', line 494 def signal_stat_stop @process_dispatcher.signal_stat_stop if @process_dispatcher nil end |
#signal_stop_forced ⇒ Object
should be called from signal(2) handler
482 483 484 485 |
# File 'lib/riser/server.rb', line 482 def signal_stop_forced @process_dispatcher.signal_stop_forced if @process_dispatcher nil end |
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler
476 477 478 479 |
# File 'lib/riser/server.rb', line 476 def signal_stop_graceful @process_dispatcher.signal_stop_graceful if @process_dispatcher nil end |
#start(server_socket) ⇒ Object
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 |
# File 'lib/riser/server.rb', line 511 def start(server_socket) case (server_socket) when TCPServer, UNIXServer socket_class = server_socket.class.superclass else socket_class = IO end parent_latch_file = Tempfile.open('riser_latch_') child_latch_file = File.open(parent_latch_file.path, File::RDWR) child_latch_file.flock(File::LOCK_EX | File::LOCK_NB) or raise "internal error: failed to lock latch file: #{parent_latch_file.path}" parent_latch_file.unlink process_list = [] @process_num.times do |pos| child_io, parent_io = UNIXSocket.socketpair pid = Process.fork{ parent_latch_file.close parent_io.close pos.times do |i| process_list[i].io.close end thread_dispatcher = SocketThreadDispatcher.new("#{@thread_queue_name}-#{pos}") thread_dispatcher.thread_num = @thread_num thread_dispatcher.thread_queue_size = @thread_queue_size thread_dispatcher.thread_queue_polling_timeout_seconds = @thread_queue_polling_timeout_seconds thread_dispatcher.at_stop(&@at_stop) thread_dispatcher.at_stat(&@at_stat) thread_dispatcher.at_stat_get(&NO_CALL) thread_dispatcher.at_stat_stop(&NO_CALL) thread_dispatcher.preprocess(&@preprocess) thread_dispatcher.postprocess(&@postprocess) thread_dispatcher.accept{ if (child_io.wait_readable(@process_send_io_polling_timeout_seconds) != nil) then command = child_io.read(SEND_LEN) command == SEND_CMD or raise "internal error: unknown command <#{command.inspect}>" child_io.recv_io(socket_class) end } thread_dispatcher.accept_return{ child_io.write(RADY_CMD) } thread_dispatcher.dispatch(&@dispatch) Signal.trap(SIGNAL_STOP_GRACEFUL) { thread_dispatcher.signal_stop_graceful } Signal.trap(SIGNAL_STOP_FORCED) { thread_dispatcher.signal_stop_forced } Signal.trap(SIGNAL_STAT_GET_AND_RESET) { thread_dispatcher.signal_stat_get(reset: true) } Signal.trap(SIGNAL_STAT_GET_NO_RESET) { thread_dispatcher.signal_stat_get(reset: false) } Signal.trap(SIGNAL_STAT_STOP) { thread_dispatcher.signal_stat_stop } # release flock(2) child_latch_file.close begin @at_fork.call thread_dispatcher.start ensure child_io.close end } child_io.close process_list << SocketProcess.new(pid, parent_io) end child_latch_file.close parent_latch_file.flock(File::LOCK_EX) # wait to release flock(2) at child processes parent_latch_file.close setup unless @process_dispatcher @process_dispatcher.thread_num = @process_num @process_dispatcher.thread_queue_size = @process_queue_size @process_dispatcher.thread_queue_polling_timeout_seconds = @process_queue_polling_timeout_seconds @process_dispatcher.at_stop{|state| case (state) when :graceful for process in process_list Process.kill(SIGNAL_STOP_GRACEFUL, process.pid) end when :forced for process in process_list Process.kill(SIGNAL_STOP_FORCED, process.pid) end end } @process_dispatcher.at_stat(&@at_stat) @process_dispatcher.at_stat_get{|reset| if (reset) then for process in process_list Process.kill(SIGNAL_STAT_GET_AND_RESET, process.pid) end else for process in process_list Process.kill(SIGNAL_STAT_GET_NO_RESET, process.pid) end end } @process_dispatcher.at_stat_stop{ for process in process_list Process.kill(SIGNAL_STAT_STOP, process.pid) end } @process_dispatcher.preprocess(&NO_CALL) @process_dispatcher.postprocess(&NO_CALL) @process_dispatcher.accept{ server_socket.accept_timeout(@accept_polling_timeout_seconds) } @process_dispatcher.accept_return(&NO_CALL) @process_dispatcher.dispatch{|socket| process = process_list[Thread.current[:number]] process.io.write(SEND_CMD) process.io.send_io(socket) response = process.io.read(RADY_LEN) response == RADY_CMD or raise "internal error: unknown response <#{response.inspect}>" } @process_dispatcher.start for process in process_list Process.wait(process.pid) process.io.close end nil end |