Class: Riser::SocketProcessDispatcher
- Inherits:
-
Object
- Object
- Riser::SocketProcessDispatcher
- Includes:
- ServerSignal
- Defined in:
- lib/riser/server.rb
Constant Summary collapse
- NO_CALL =
:nodoc:
proc{}
Constants included from ServerSignal
Riser::ServerSignal::SIGNAL_RESTART_FORCED, Riser::ServerSignal::SIGNAL_RESTART_GRACEFUL, Riser::ServerSignal::SIGNAL_STAT_GET_AND_RESET, Riser::ServerSignal::SIGNAL_STAT_GET_NO_RESET, Riser::ServerSignal::SIGNAL_STAT_STOP, Riser::ServerSignal::SIGNAL_STOP_FORCED, Riser::ServerSignal::SIGNAL_STOP_GRACEFUL
Instance Attribute Summary collapse
-
#accept_polling_timeout_seconds ⇒ Object
Returns the value of attribute accept_polling_timeout_seconds.
-
#process_num ⇒ Object
Returns the value of attribute process_num.
-
#process_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_queue_polling_timeout_seconds.
-
#process_queue_size ⇒ Object
Returns the value of attribute process_queue_size.
-
#process_send_io_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_send_io_polling_timeout_seconds.
-
#thread_num ⇒ Object
Returns the value of attribute thread_num.
-
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
-
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
Instance Method Summary collapse
-
#at_fork(&block) ⇒ Object
:yields:.
-
#at_stat(&block) ⇒ Object
:yields: stat_info.
-
#at_stop(&block) ⇒ Object
:yields: stop_state.
-
#dispatch(&block) ⇒ Object
:yields: accept_object.
-
#initialize(process_queue_name, thread_queue_name) ⇒ SocketProcessDispatcher
constructor
A new instance of SocketProcessDispatcher.
-
#postprocess(&block) ⇒ Object
:yields:.
-
#preprocess(&block) ⇒ Object
:yields:.
-
#setup ⇒ Object
after this method call is completed, the object will be ready to accept ‘signal_…’ methods.
-
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler.
-
#signal_stat_stop ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_forced ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler.
- #start(server_socket) ⇒ Object
Constructor Details
#initialize(process_queue_name, thread_queue_name) ⇒ SocketProcessDispatcher
Returns a new instance of SocketProcessDispatcher.
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 |
# File 'lib/riser/server.rb', line 388 def initialize(process_queue_name, thread_queue_name) @accept_polling_timeout_seconds = nil @process_num = nil @process_queue_name = process_queue_name @process_queue_size = nil @process_queue_polling_timeout_seconds = nil @process_send_io_polling_timeout_seconds = nil @thread_num = nil @thread_queue_name = thread_queue_name @thread_queue_size = nil @thread_queue_polling_timeout_seconds = nil @at_fork= nil @at_stop = nil @at_stat = nil @preprocess = nil @postprocess = nil @dispatch = nil @process_dispatcher = nil end |
Instance Attribute Details
#accept_polling_timeout_seconds ⇒ Object
Returns the value of attribute accept_polling_timeout_seconds.
408 409 410 |
# File 'lib/riser/server.rb', line 408 def accept_polling_timeout_seconds @accept_polling_timeout_seconds end |
#process_num ⇒ Object
Returns the value of attribute process_num.
409 410 411 |
# File 'lib/riser/server.rb', line 409 def process_num @process_num end |
#process_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_queue_polling_timeout_seconds.
411 412 413 |
# File 'lib/riser/server.rb', line 411 def process_queue_polling_timeout_seconds @process_queue_polling_timeout_seconds end |
#process_queue_size ⇒ Object
Returns the value of attribute process_queue_size.
410 411 412 |
# File 'lib/riser/server.rb', line 410 def process_queue_size @process_queue_size end |
#process_send_io_polling_timeout_seconds ⇒ Object
Returns the value of attribute process_send_io_polling_timeout_seconds.
412 413 414 |
# File 'lib/riser/server.rb', line 412 def process_send_io_polling_timeout_seconds @process_send_io_polling_timeout_seconds end |
#thread_num ⇒ Object
Returns the value of attribute thread_num.
413 414 415 |
# File 'lib/riser/server.rb', line 413 def thread_num @thread_num end |
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
415 416 417 |
# File 'lib/riser/server.rb', line 415 def thread_queue_polling_timeout_seconds @thread_queue_polling_timeout_seconds end |
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
414 415 416 |
# File 'lib/riser/server.rb', line 414 def thread_queue_size @thread_queue_size end |
Instance Method Details
#at_fork(&block) ⇒ Object
:yields:
417 418 419 420 |
# File 'lib/riser/server.rb', line 417 def at_fork(&block) # :yields: @at_fork = block nil end |
#at_stat(&block) ⇒ Object
:yields: stat_info
427 428 429 430 |
# File 'lib/riser/server.rb', line 427 def at_stat(&block) # :yields: stat_info @at_stat = block nil end |
#at_stop(&block) ⇒ Object
:yields: stop_state
422 423 424 425 |
# File 'lib/riser/server.rb', line 422 def at_stop(&block) # :yields: stop_state @at_stop = block nil end |
#dispatch(&block) ⇒ Object
:yields: accept_object
442 443 444 445 |
# File 'lib/riser/server.rb', line 442 def dispatch(&block) # :yields: accept_object @dispatch = block nil end |
#postprocess(&block) ⇒ Object
:yields:
437 438 439 440 |
# File 'lib/riser/server.rb', line 437 def postprocess(&block) # :yields: @postprocess = block nil end |
#preprocess(&block) ⇒ Object
:yields:
432 433 434 435 |
# File 'lib/riser/server.rb', line 432 def preprocess(&block) # :yields: @preprocess = block nil end |
#setup ⇒ Object
after this method call is completed, the object will be ready to accept ‘signal_…’ methods.
473 474 475 476 |
# File 'lib/riser/server.rb', line 473 def setup @process_dispatcher = SocketThreadDispatcher.new(@process_queue_name) nil end |
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler
460 461 462 463 |
# File 'lib/riser/server.rb', line 460 def signal_stat_get(reset: true) @process_dispatcher.signal_stat_get(reset: reset) if @process_dispatcher nil end |
#signal_stat_stop ⇒ Object
should be called from signal(2) handler
466 467 468 469 |
# File 'lib/riser/server.rb', line 466 def signal_stat_stop @process_dispatcher.signal_stat_stop if @process_dispatcher nil end |
#signal_stop_forced ⇒ Object
should be called from signal(2) handler
454 455 456 457 |
# File 'lib/riser/server.rb', line 454 def signal_stop_forced @process_dispatcher.signal_stop_forced if @process_dispatcher nil end |
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler
448 449 450 451 |
# File 'lib/riser/server.rb', line 448 def signal_stop_graceful @process_dispatcher.signal_stop_graceful if @process_dispatcher nil end |
#start(server_socket) ⇒ Object
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 |
# File 'lib/riser/server.rb', line 478 def start(server_socket) case (server_socket) when TCPServer, UNIXServer socket_class = server_socket.class.superclass else socket_class = IO end process_list = [] @process_num.times do |pos| child_io, parent_io = UNIXSocket.socketpair pid = Process.fork{ parent_io.close pos.times do |i| process_list[i].io.close end thread_dispatcher = SocketThreadDispatcher.new("#{@thread_queue_name}-#{pos}") thread_dispatcher.thread_num = @thread_num thread_dispatcher.thread_queue_size = @thread_queue_size thread_dispatcher.thread_queue_polling_timeout_seconds = @thread_queue_polling_timeout_seconds thread_dispatcher.at_stop(&@at_stop) thread_dispatcher.at_stat(&@at_stat) thread_dispatcher.at_stat_get(&NO_CALL) thread_dispatcher.at_stat_stop(&NO_CALL) thread_dispatcher.preprocess(&@preprocess) thread_dispatcher.postprocess(&@postprocess) thread_dispatcher.accept{ if (child_io.wait_readable(@process_send_io_polling_timeout_seconds) != nil) then command = child_io.read(5) command == "SEND\n" or raise "internal error: unknown command <#{command.inspect}>" child_io.recv_io(socket_class) end } thread_dispatcher.accept_return{ child_io.write("RADY\n") } thread_dispatcher.dispatch(&@dispatch) Signal.trap(SIGNAL_STOP_GRACEFUL) { thread_dispatcher.signal_stop_graceful } Signal.trap(SIGNAL_STOP_FORCED) { thread_dispatcher.signal_stop_forced } Signal.trap(SIGNAL_STAT_GET_AND_RESET) { thread_dispatcher.signal_stat_get(reset: true) } Signal.trap(SIGNAL_STAT_GET_NO_RESET) { thread_dispatcher.signal_stat_get(reset: false) } Signal.trap(SIGNAL_STAT_STOP) { thread_dispatcher.signal_stat_stop } begin child_io.write("RADY\n") @at_fork.call thread_dispatcher.start ensure child_io.close end } child_io.close process_list << SocketProcess.new(pid, parent_io) end for process in process_list response = process.io.read(5) response == "RADY\n" or raise "internal error: unknown response <#{response.inspect}>" end setup unless @process_dispatcher @process_dispatcher.thread_num = @process_num @process_dispatcher.thread_queue_size = @process_queue_size @process_dispatcher.thread_queue_polling_timeout_seconds = @process_queue_polling_timeout_seconds @process_dispatcher.at_stop{|state| case (state) when :graceful for process in process_list Process.kill(SIGNAL_STOP_GRACEFUL, process.pid) end when :forced for process in process_list Process.kill(SIGNAL_STOP_FORCED, process.pid) end end } @process_dispatcher.at_stat(&@at_stat) @process_dispatcher.at_stat_get{|reset| if (reset) then for process in process_list Process.kill(SIGNAL_STAT_GET_AND_RESET, process.pid) end else for process in process_list Process.kill(SIGNAL_STAT_GET_NO_RESET, process.pid) end end } @process_dispatcher.at_stat_stop{ for process in process_list Process.kill(SIGNAL_STAT_STOP, process.pid) end } @process_dispatcher.preprocess(&NO_CALL) @process_dispatcher.postprocess(&NO_CALL) @process_dispatcher.accept{ if (server_socket.wait_readable(@accept_polling_timeout_seconds) != nil) then begin server_socket.accept_nonblock rescue IO::WaitReadable nil # to avoid conflicting accept(2) at server restart overlap end end } @process_dispatcher.accept_return(&NO_CALL) @process_dispatcher.dispatch{|socket| process = process_list[Thread.current[:number]] process.io.write("SEND\n") process.io.send_io(socket) response = process.io.read(5) response == "RADY\n" or raise "internal error: unknown response <#{response.inspect}>" } @process_dispatcher.start for process in process_list Process.wait(process.pid) process.io.close end nil end |