Class: ZMachine::ConnectionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/zmachine/connection_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(selector) ⇒ ConnectionManager

Returns a new instance of ConnectionManager.



11
12
13
14
15
16
17
18
# File 'lib/zmachine/connection_manager.rb', line 11

def initialize(selector)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  @selector = selector
  @connections = []
  @zmq_connections = []
  @new_connections = []
  @unbound_connections = []
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



9
10
11
# File 'lib/zmachine/connection_manager.rb', line 9

def connections
  @connections
end

Instance Method Details

#add_new_connectionsObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/zmachine/connection_manager.rb', line 80

def add_new_connections
  @new_connections.compact.each do |connection|
    ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug
    begin
      connection.register(@selector)
      @connections << connection
      @zmq_connections << connection if connection.channel.is_a?(ZMQChannel)
    rescue ClosedChannelException => e
      ZMachine.logger.exception(e, "failed to add connection")
      @unbound_connections << connection
    end
  end
  @new_connections.clear
end

#bind(address, port_or_type, handler, *args, &block) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/zmachine/connection_manager.rb', line 31

def bind(address, port_or_type, handler, *args, &block)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug
  connection = build_connection(handler, *args, &block)
  connection.bind(address, port_or_type)
  @new_connections << connection
  connection
end

#cleanupObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/zmachine/connection_manager.rb', line 95

def cleanup
  return if @unbound_connections.empty?
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  @unbound_connections.each do |connection|
    reason = nil
    connection, reason = *connection if connection.is_a?(Array)
    begin
      @connections.delete(connection)
      @zmq_connections.delete(connection)
      connection.unbind
      connection.close
    rescue Exception => e
      ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug
    end
  end
  @unbound_connections.clear
end

#close_connection(connection) ⇒ Object



75
76
77
78
# File 'lib/zmachine/connection_manager.rb', line 75

def close_connection(connection)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug
  @unbound_connections << connection
end

#connect(address, port_or_type, handler, *args, &block) ⇒ Object



39
40
41
42
43
44
45
46
47
48
# File 'lib/zmachine/connection_manager.rb', line 39

def connect(address, port_or_type, handler, *args, &block)
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug
  connection = build_connection(handler, *args, &block)
  connection.connect(address, port_or_type)
  @new_connections << connection
  yield connection if block_given?
  connection
rescue java.nio.channels.UnresolvedAddressException
  raise ZMachine::ConnectionError.new('unable to resolve server address')
end

#idle?Boolean

Returns:

  • (Boolean)


20
21
22
23
# File 'lib/zmachine/connection_manager.rb', line 20

def idle?
  @new_connections.size == 0 and
  @zmq_connections.none? {|c| c.channel.has_more? } # see comment in #process
end

#processObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/zmachine/connection_manager.rb', line 50

def process
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  add_new_connections
  it = @selector.selected_keys.iterator
  while it.has_next
    process_connection(it.next.attachment)
    it.remove
  end
  # super ugly, but ZMQ only triggers the FD if and only if you have read
  # every message from the socket. under load however there will always be
  # new messages in the mailbox between last recv and next select, which
  # causes the FD never to be triggered again.
  # the only mitigation strategy i came up with is iterating over all channels :(
  @zmq_connections.each do |connection|
    connection.readable! if connection.channel.has_more?
  end
end

#process_connection(connection) ⇒ Object



68
69
70
71
72
73
# File 'lib/zmachine/connection_manager.rb', line 68

def process_connection(connection)
  new_connection = connection.process_events
  @new_connections << new_connection if new_connection
rescue IOException
  close_connection(connection)
end

#shutdownObject



25
26
27
28
29
# File 'lib/zmachine/connection_manager.rb', line 25

def shutdown
  ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
  @unbound_connections += @connections
  cleanup
end