Class: MessageBus::ConnectionManager

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/message_bus/connection_manager.rb

Overview

Manages a set of subscribers with active connections to the server, such that messages which are published during the connection may be dispatched.

Instance Method Summary collapse

Constructor Details

#initialize(bus = nil) ⇒ ConnectionManager

Returns a new instance of ConnectionManager.

Parameters:



12
13
14
15
16
17
# File 'lib/message_bus/connection_manager.rb', line 12

def initialize(bus = nil)
  @clients = {}
  @subscriptions = {}
  @bus = bus || MessageBus
  mon_initialize
end

Instance Method Details

#add_client(client) ⇒ void

This method returns an undefined value.

Keeps track of a client with an active connection

Parameters:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/message_bus/connection_manager.rb', line 53

def add_client(client)
  synchronize do
    existing = @clients[client.client_id]
    if existing && existing.seq > client.seq
      client.close
    else
      if existing
        remove_client(existing)
        existing.close
      end

      @clients[client.client_id] = client
      @subscriptions[client.site_id] ||= {}
      client.subscriptions.each do |k, _v|
        subscribe_client(client, k)
      end
    end
  end
end

#client_countInteger

Returns the number of tracked clients.

Returns:

  • (Integer)

    the number of tracked clients



99
100
101
102
103
# File 'lib/message_bus/connection_manager.rb', line 99

def client_count
  synchronize do
    @clients.length
  end
end

#lookup_client(client_id) ⇒ MessageBus::Client

Finds a client by ID

Parameters:

  • client_id (String)

    the client ID to search by

Returns:



92
93
94
95
96
# File 'lib/message_bus/connection_manager.rb', line 92

def lookup_client(client_id)
  synchronize do
    @clients[client_id]
  end
end

#notify_clients(msg) ⇒ void

This method returns an undefined value.

Dispatches a message to any connected clients which are permitted to receive it

Parameters:



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/message_bus/connection_manager.rb', line 22

def notify_clients(msg)
  synchronize do
    begin
      site_subs = @subscriptions[msg.site_id]
      subscription = site_subs[msg.channel] if site_subs

      return unless subscription

      subscription.each do |client_id|
        client = @clients[client_id]
        if client && client.allowed?(msg)
          begin
            client.synchronize do
              client << msg
            end
          rescue
            # pipe may be broken, move on
          end
          # turns out you can delete from a set while iterating
          remove_client(client) if client.closed?
        end
      end
    rescue => e
      @bus.logger.error "notify clients crash #{e} : #{e.backtrace}"
    end
  end
end

#remove_client(c) ⇒ void

This method returns an undefined value.

Removes a client

Parameters:



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/message_bus/connection_manager.rb', line 76

def remove_client(c)
  synchronize do
    @clients.delete c.client_id
    @subscriptions[c.site_id].each do |_k, set|
      set.delete c.client_id
    end
    if c.cleanup_timer
      # concurrency may cause this to fail
      c.cleanup_timer.cancel rescue nil
    end
  end
end