Class: MessageBus::ConnectionManager

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/message_bus/connection_manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(bus = nil) ⇒ ConnectionManager

Returns a new instance of ConnectionManager.



8
9
10
11
12
13
# File 'lib/message_bus/connection_manager.rb', line 8

def initialize(bus = nil)
  @clients = {}
  @subscriptions = {}
  @bus = bus || MessageBus
  mon_initialize
end

Instance Method Details

#add_client(client) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/message_bus/connection_manager.rb', line 44

def add_client(client)
  synchronize do
    existing = @clients[client.client_id]
    if existing && existing.seq > client.seq
      client.cancel
    else
      if existing
        remove_client(existing)
        existing.cancel
      end

      @clients[client.client_id] = client
      @subscriptions[client.site_id] ||= {}
      client.subscriptions.each do |k, v|
        subscribe_client(client, k)
      end
    end
  end
end

#client_countObject



94
95
96
97
98
# File 'lib/message_bus/connection_manager.rb', line 94

def client_count
  synchronize do
    @clients.length
  end
end

#lookup_client(client_id) ⇒ Object



77
78
79
80
81
# File 'lib/message_bus/connection_manager.rb', line 77

def lookup_client(client_id)
  synchronize do
    @clients[client_id]
  end
end

#notify_clients(msg) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/message_bus/connection_manager.rb', line 15

def notify_clients(msg)
  synchronize do
    begin
      site_subs = @subscriptions[msg.site_id]
      subscription = site_subs[msg.channel] if site_subs

      return unless subscription

      subscription.each do |client_id|
        client = @clients[client_id]
        if client && client.allowed?(msg)
          begin
            client.synchronize do
              client << msg
            end
          rescue
            # pipe may be broken, move on
          end
          # turns out you can delete from a set while itereating
          remove_client(client) if client.closed?
        end
      end

    rescue => e
      MessageBus.logger.error "notify clients crash #{e} : #{e.backtrace}"
    end
  end
end

#remove_client(c) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/message_bus/connection_manager.rb', line 64

def remove_client(c)
  synchronize do
    @clients.delete c.client_id
    @subscriptions[c.site_id].each do |k, set|
      set.delete c.client_id
    end
    if c.cleanup_timer
      # concurrency may cause this to fail
      c.cleanup_timer.cancel rescue nil
    end
  end
end

#statsObject



100
101
102
103
104
105
106
107
# File 'lib/message_bus/connection_manager.rb', line 100

def stats
  synchronize do
    {
      client_count: @clients.length,
      subscriptions: @subscriptions
    }
  end
end

#subscribe_client(client, channel) ⇒ Object



83
84
85
86
87
88
89
90
91
92
# File 'lib/message_bus/connection_manager.rb', line 83

def subscribe_client(client, channel)
  synchronize do
    set = @subscriptions[client.site_id][channel]
    unless set
      set = Set.new
      @subscriptions[client.site_id][channel] = set
    end
    set << client.client_id
  end
end