Class: MessageBus::ConnectionManager

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/message_bus/connection_manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(bus = nil) ⇒ ConnectionManager

Returns a new instance of ConnectionManager.



7
8
9
10
11
12
# File 'lib/message_bus/connection_manager.rb', line 7

def initialize(bus=nil)
  @clients = {}
  @subscriptions = {}
  @bus = bus || MessageBus
  mon_initialize
end

Instance Method Details

#add_client(client) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/message_bus/connection_manager.rb', line 60

def add_client(client)
  synchronize do
    existing = @clients[client.client_id]
    if existing && existing.seq > client.seq
      client.cancel
    else
      if existing
        remove_client(existing)
        existing.cancel
      end

      @clients[client.client_id] = client
      @subscriptions[client.site_id] ||= {}
      client.subscriptions.each do |k,v|
        subscribe_client(client, k)
      end
    end
  end
end

#client_countObject



110
111
112
113
114
# File 'lib/message_bus/connection_manager.rb', line 110

def client_count
  synchronize do
    @clients.length
  end
end

#lookup_client(client_id) ⇒ Object



93
94
95
96
97
# File 'lib/message_bus/connection_manager.rb', line 93

def lookup_client(client_id)
  synchronize do
    @clients[client_id]
  end
end

#notify_clients(msg) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/message_bus/connection_manager.rb', line 14

def notify_clients(msg)
  synchronize do
    begin
      site_subs = @subscriptions[msg.site_id]
      subscription = site_subs[msg.channel] if site_subs

      return unless subscription

      around_filter = @bus.around_client_batch(msg.channel)

      work = lambda do
        subscription.each do |client_id|
          client = @clients[client_id]
          if client && client.allowed?(msg)
            if copy = client.filter(msg)
              begin
                client << copy
              rescue
                # pipe may be broken, move on
              end
              # turns out you can delete from a set while itereating
              remove_client(client)
            end
          end
        end
      end

      if around_filter
        user_ids = subscription.map do |s|
          c = @clients[s]
          c && c.user_id
        end.compact

        if user_ids && user_ids.length > 0
          around_filter.call(msg, user_ids, work)
        end
      else
        work.call
      end

    rescue => e
      MessageBus.logger.error "notify clients crash #{e} : #{e.backtrace}"
    end
  end
end

#remove_client(c) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/message_bus/connection_manager.rb', line 80

def remove_client(c)
  synchronize do
    @clients.delete c.client_id
    @subscriptions[c.site_id].each do |k, set|
      set.delete c.client_id
    end
    if c.cleanup_timer
      # concurrency may cause this to fail
      c.cleanup_timer.cancel rescue nil
    end
  end
end

#statsObject



116
117
118
119
120
121
122
123
# File 'lib/message_bus/connection_manager.rb', line 116

def stats
  synchronize do
    {
      client_count: @clients.length,
      subscriptions: @subscriptions
    }
  end
end

#subscribe_client(client, channel) ⇒ Object



99
100
101
102
103
104
105
106
107
108
# File 'lib/message_bus/connection_manager.rb', line 99

def subscribe_client(client,channel)
  synchronize do
    set = @subscriptions[client.site_id][channel]
    unless set
      set = Set.new
      @subscriptions[client.site_id][channel] = set
    end
    set << client.client_id
  end
end