Class: MessageBus::ConnectionManager

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/message_bus/connection_manager.rb

Overview

Manages a set of subscribers with active connections to the server, such that messages which are published during the connection may be dispatched.

Instance Method Summary collapse

Constructor Details

#initialize(bus = nil) ⇒ ConnectionManager



12
13
14
15
16
17
# File 'lib/message_bus/connection_manager.rb', line 12

def initialize(bus = nil)
  @clients = {}
  @subscriptions = {}
  @bus = bus || MessageBus
  mon_initialize
end

Instance Method Details

#add_client(client) ⇒ void

This method returns an undefined value.

Keeps track of a client with an active connection



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/message_bus/connection_manager.rb', line 53

def add_client(client)
  synchronize do
    existing = @clients[client.client_id]
    if existing && existing.seq > client.seq
      client.close
    else
      if existing
        remove_client(existing)
        existing.close
      end

      @clients[client.client_id] = client
      @subscriptions[client.site_id] ||= {}
      client.subscriptions.each do |k, _v|
        subscribe_client(client, k)
      end
    end
  end
end

#client_countInteger



99
100
101
102
103
# File 'lib/message_bus/connection_manager.rb', line 99

def client_count
  synchronize do
    @clients.length
  end
end

#lookup_client(client_id) ⇒ MessageBus::Client

Finds a client by ID



92
93
94
95
96
# File 'lib/message_bus/connection_manager.rb', line 92

def lookup_client(client_id)
  synchronize do
    @clients[client_id]
  end
end

#notify_clients(msg) ⇒ void

This method returns an undefined value.

Dispatches a message to any connected clients which are permitted to receive it



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/message_bus/connection_manager.rb', line 22

def notify_clients(msg)
  synchronize do
    begin
      site_subs = @subscriptions[msg.site_id]
      subscription = site_subs[msg.channel] if site_subs

      return unless subscription

      subscription.each do |client_id|
        client = @clients[client_id]
        if client && client.allowed?(msg)
          begin
            client.synchronize do
              client << msg
            end
          rescue
            # pipe may be broken, move on
          end
          # turns out you can delete from a set while itereating
          remove_client(client) if client.closed?
        end
      end
    rescue => e
      @bus.logger.error "notify clients crash #{e} : #{e.backtrace}"
    end
  end
end

#remove_client(c) ⇒ void

This method returns an undefined value.

Removes a client



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/message_bus/connection_manager.rb', line 76

def remove_client(c)
  synchronize do
    @clients.delete c.client_id
    @subscriptions[c.site_id].each do |_k, set|
      set.delete c.client_id
    end
    if c.cleanup_timer
      # concurrency may cause this to fail
      c.cleanup_timer.cancel rescue nil
    end
  end
end