Class: ZMachine::ConnectionManager
- Inherits:
-
Object
- Object
- ZMachine::ConnectionManager
- Defined in:
- lib/zmachine/connection_manager.rb
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
Instance Method Summary collapse
- #add_new_connections ⇒ Object
- #bind(address, port_or_type, handler, *args, &block) ⇒ Object
- #cleanup ⇒ Object
- #close_connection(connection, after_writing = false, reason = nil) ⇒ Object
- #connect(address, port_or_type, handler, *args, &block) ⇒ Object
- #idle? ⇒ Boolean
-
#initialize(selector) ⇒ ConnectionManager
constructor
A new instance of ConnectionManager.
- #is_connected?(connection) ⇒ Boolean
- #process ⇒ Object
- #process_connection(connection) ⇒ Object
- #shutdown ⇒ Object
- #unbind_connection(connection) ⇒ Object
Constructor Details
#initialize(selector) ⇒ ConnectionManager
Returns a new instance of ConnectionManager.
12 13 14 15 16 17 18 19 |
# File 'lib/zmachine/connection_manager.rb', line 12 def initialize(selector) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug @selector = selector @connections = Set.new @zmq_connections = Set.new @new_connections = Set.new @closing_connections = [] end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
10 11 12 |
# File 'lib/zmachine/connection_manager.rb', line 10 def connections @connections end |
Instance Method Details
#add_new_connections ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/zmachine/connection_manager.rb', line 84 def add_new_connections @new_connections.each do |connection| ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug begin connection.register(@selector) @connections << connection if connection.channel.is_a?(ZMQChannel) @zmq_connections << connection connection.connection_completed end rescue ClosedChannelException => e @closing_connections << [connection, false, e] end end @new_connections.clear end |
#bind(address, port_or_type, handler, *args, &block) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/zmachine/connection_manager.rb', line 32 def bind(address, port_or_type, handler, *args, &block) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug connection = build_connection(handler, *args) connection.bind(address, port_or_type, &block) @new_connections << connection connection end |
#cleanup ⇒ Object
105 106 107 108 109 110 111 112 113 |
# File 'lib/zmachine/connection_manager.rb', line 105 def cleanup return if @closing_connections.empty? ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug closing_connections = @closing_connections @closing_connections = [] closing_connections.each do |connection| unbind_connection(connection) end end |
#close_connection(connection, after_writing = false, reason = nil) ⇒ Object
79 80 81 82 |
# File 'lib/zmachine/connection_manager.rb', line 79 def close_connection(connection, after_writing = false, reason = nil) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, after_writing: after_writing, reason: reason.inspect) if ZMachine.debug @closing_connections << [connection, after_writing, reason] end |
#connect(address, port_or_type, handler, *args, &block) ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/zmachine/connection_manager.rb', line 40 def connect(address, port_or_type, handler, *args, &block) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug connection = build_connection(handler, *args) connection.connect(address, port_or_type, &block) @new_connections << connection connection rescue java.nio.channels.UnresolvedAddressException raise ZMachine::ConnectionError.new('unable to resolve server address') end |
#idle? ⇒ Boolean
21 22 23 24 |
# File 'lib/zmachine/connection_manager.rb', line 21 def idle? @new_connections.size == 0 and @zmq_connections.none? {|c| c.channel.can_recv? } # see comment in #process end |
#is_connected?(connection) ⇒ Boolean
101 102 103 |
# File 'lib/zmachine/connection_manager.rb', line 101 def is_connected?(connection) @connections.include?(connection) end |
#process ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/zmachine/connection_manager.rb', line 50 def process ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug add_new_connections it = @selector.selected_keys.iterator while it.has_next process_connection(it.next.) it.remove end # super ugly, but ZMQ only triggers the FD if and only if you # have read every message from the socket. under load however # there will always be new messages in the mailbox between last # recv and next select, which causes the FD never to be # triggered again. # the only mitigation strategy i came up with is iterating over all # channels. performance impact shouldn't be too huge, since ZMQ takes # care of all the multiplexing and we only have a small amount of ZMQ # connections in the reactor @zmq_connections.each do |connection| connection.readable! if connection.channel.can_recv? end end |
#process_connection(connection) ⇒ Object
72 73 74 75 76 77 |
# File 'lib/zmachine/connection_manager.rb', line 72 def process_connection(connection) new_connection = connection.process_events @new_connections << new_connection if new_connection rescue IOException => e close_connection(connection, false, e) end |
#shutdown ⇒ Object
26 27 28 29 30 |
# File 'lib/zmachine/connection_manager.rb', line 26 def shutdown ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug @closing_connections += @connections.to_a cleanup end |
#unbind_connection(connection) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/zmachine/connection_manager.rb', line 115 def unbind_connection(connection) after_writing = false reason = nil connection, after_writing, reason = *connection if connection.is_a?(Array) if connection.method(:unbind).arity != 0 connection.unbind(reason) else connection.unbind end ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, after_writing: after_writing, can_send: connection.can_send?) if ZMachine.debug if after_writing && connection.can_send? ZMachine.close_connection(connection, true) else connection.close! @connections.delete(connection) @zmq_connections.delete(connection) end rescue Exception => e ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug end |