Class: MessageBus::ConnectionManager
- Inherits:
-
Object
- Object
- MessageBus::ConnectionManager
- Includes:
- MonitorMixin
- Defined in:
- lib/message_bus/connection_manager.rb
Instance Method Summary collapse
- #add_client(client) ⇒ Object
- #client_count ⇒ Object
-
#initialize(bus = nil) ⇒ ConnectionManager
constructor
A new instance of ConnectionManager.
- #lookup_client(client_id) ⇒ Object
- #notify_clients(msg) ⇒ Object
- #remove_client(c) ⇒ Object
- #stats ⇒ Object
- #subscribe_client(client, channel) ⇒ Object
Constructor Details
#initialize(bus = nil) ⇒ ConnectionManager
Returns a new instance of ConnectionManager.
8 9 10 11 12 13 |
# File 'lib/message_bus/connection_manager.rb', line 8 def initialize(bus = nil) @clients = {} @subscriptions = {} @bus = bus || MessageBus mon_initialize end |
Instance Method Details
#add_client(client) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/message_bus/connection_manager.rb', line 44 def add_client(client) synchronize do existing = @clients[client.client_id] if existing && existing.seq > client.seq client.cancel else if existing remove_client(existing) existing.cancel end @clients[client.client_id] = client @subscriptions[client.site_id] ||= {} client.subscriptions.each do |k, v| subscribe_client(client, k) end end end end |
#client_count ⇒ Object
94 95 96 97 98 |
# File 'lib/message_bus/connection_manager.rb', line 94 def client_count synchronize do @clients.length end end |
#lookup_client(client_id) ⇒ Object
77 78 79 80 81 |
# File 'lib/message_bus/connection_manager.rb', line 77 def lookup_client(client_id) synchronize do @clients[client_id] end end |
#notify_clients(msg) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/message_bus/connection_manager.rb', line 15 def notify_clients(msg) synchronize do begin site_subs = @subscriptions[msg.site_id] subscription = site_subs[msg.channel] if site_subs return unless subscription subscription.each do |client_id| client = @clients[client_id] if client && client.allowed?(msg) begin client.synchronize do client << msg end rescue # pipe may be broken, move on end # turns out you can delete from a set while itereating remove_client(client) if client.closed? end end rescue => e MessageBus.logger.error "notify clients crash #{e} : #{e.backtrace}" end end end |
#remove_client(c) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/message_bus/connection_manager.rb', line 64 def remove_client(c) synchronize do @clients.delete c.client_id @subscriptions[c.site_id].each do |k, set| set.delete c.client_id end if c.cleanup_timer # concurrency may cause this to fail c.cleanup_timer.cancel rescue nil end end end |
#stats ⇒ Object
100 101 102 103 104 105 106 107 |
# File 'lib/message_bus/connection_manager.rb', line 100 def stats synchronize do { client_count: @clients.length, subscriptions: @subscriptions } end end |
#subscribe_client(client, channel) ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/message_bus/connection_manager.rb', line 83 def subscribe_client(client, channel) synchronize do set = @subscriptions[client.site_id][channel] unless set set = Set.new @subscriptions[client.site_id][channel] = set end set << client.client_id end end |