Class: ZMachine::ConnectionManager
- Inherits:
-
Object
- Object
- ZMachine::ConnectionManager
- Defined in:
- lib/zmachine/connection_manager.rb
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
Instance Method Summary collapse
- #add_new_connections ⇒ Object
- #bind(address, port_or_type, handler, *args, &block) ⇒ Object
- #cleanup ⇒ Object
- #close_connection(connection) ⇒ Object
- #connect(address, port_or_type, handler, *args, &block) ⇒ Object
- #idle? ⇒ Boolean
-
#initialize(selector) ⇒ ConnectionManager
constructor
A new instance of ConnectionManager.
- #process ⇒ Object
- #process_connection(connection) ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(selector) ⇒ ConnectionManager
Returns a new instance of ConnectionManager.
11 12 13 14 15 16 17 18 |
# File 'lib/zmachine/connection_manager.rb', line 11 def initialize(selector) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug @selector = selector @connections = [] @zmq_connections = [] @new_connections = [] @unbound_connections = [] end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
9 10 11 |
# File 'lib/zmachine/connection_manager.rb', line 9 def connections @connections end |
Instance Method Details
#add_new_connections ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/zmachine/connection_manager.rb', line 80 def add_new_connections @new_connections.compact.each do |connection| ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug begin connection.register(@selector) @connections << connection @zmq_connections << connection if connection.channel.is_a?(ZMQChannel) rescue ClosedChannelException => e ZMachine.logger.exception(e, "failed to add connection") @unbound_connections << connection end end @new_connections.clear end |
#bind(address, port_or_type, handler, *args, &block) ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/zmachine/connection_manager.rb', line 31 def bind(address, port_or_type, handler, *args, &block) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug connection = build_connection(handler, *args, &block) connection.bind(address, port_or_type) @new_connections << connection connection end |
#cleanup ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/zmachine/connection_manager.rb', line 95 def cleanup return if @unbound_connections.empty? ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug @unbound_connections.each do |connection| reason = nil connection, reason = *connection if connection.is_a?(Array) begin @connections.delete(connection) @zmq_connections.delete(connection) connection.unbind connection.close rescue Exception => e ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug end end @unbound_connections.clear end |
#close_connection(connection) ⇒ Object
75 76 77 78 |
# File 'lib/zmachine/connection_manager.rb', line 75 def close_connection(connection) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug @unbound_connections << connection end |
#connect(address, port_or_type, handler, *args, &block) ⇒ Object
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/zmachine/connection_manager.rb', line 39 def connect(address, port_or_type, handler, *args, &block) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug connection = build_connection(handler, *args, &block) connection.connect(address, port_or_type) @new_connections << connection yield connection if block_given? connection rescue java.nio.channels.UnresolvedAddressException raise ZMachine::ConnectionError.new('unable to resolve server address') end |
#idle? ⇒ Boolean
20 21 22 23 |
# File 'lib/zmachine/connection_manager.rb', line 20 def idle? @new_connections.size == 0 and @zmq_connections.none? {|c| c.channel.has_more? } # see comment in #process end |
#process ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/zmachine/connection_manager.rb', line 50 def process ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug add_new_connections it = @selector.selected_keys.iterator while it.has_next process_connection(it.next.) it.remove end # super ugly, but ZMQ only triggers the FD if and only if you have read # every message from the socket. under load however there will always be # new messages in the mailbox between last recv and next select, which # causes the FD never to be triggered again. # the only mitigation strategy i came up with is iterating over all channels :( @zmq_connections.each do |connection| connection.readable! if connection.channel.has_more? end end |
#process_connection(connection) ⇒ Object
68 69 70 71 72 73 |
# File 'lib/zmachine/connection_manager.rb', line 68 def process_connection(connection) new_connection = connection.process_events @new_connections << new_connection if new_connection rescue IOException close_connection(connection) end |
#shutdown ⇒ Object
25 26 27 28 29 |
# File 'lib/zmachine/connection_manager.rb', line 25 def shutdown ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug @unbound_connections += @connections cleanup end |