Class: MessageBus::ConnectionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/connection_manager.rb

Defined Under Namespace

Classes: SynchronizedSet

Instance Method Summary collapse

Constructor Details

#initialize(bus = nil) ⇒ ConnectionManager

Returns a new instance of ConnectionManager.



28
29
30
31
32
# File 'lib/message_bus/connection_manager.rb', line 28

def initialize(bus = nil)
  @clients = {}
  @subscriptions = {}
  @bus = bus || MessageBus
end

Instance Method Details

#add_client(client) ⇒ Object



78
79
80
81
82
83
84
# File 'lib/message_bus/connection_manager.rb', line 78

def add_client(client)
  @clients[client.client_id] = client
  @subscriptions[client.site_id] ||= {}
  client.subscriptions.each do |k,v|
    subscribe_client(client, k)
  end
end

#client_countObject



107
108
109
# File 'lib/message_bus/connection_manager.rb', line 107

def client_count
  @clients.length
end

#lookup_client(client_id) ⇒ Object



94
95
96
# File 'lib/message_bus/connection_manager.rb', line 94

def lookup_client(client_id)
  @clients[client_id]
end

#notify_clients(msg) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/message_bus/connection_manager.rb', line 34

def notify_clients(msg)
  begin
    site_subs = @subscriptions[msg.site_id]
    subscription = site_subs[msg.channel] if site_subs

    return unless subscription

    around_filter = @bus.around_client_batch(msg.channel)

    work = lambda do
      subscription.each do |client_id|
        client = @clients[client_id]
        if client && client.allowed?(msg)
          if copy = client.filter(msg)
            begin
              client << copy
            rescue
              # pipe may be broken, move on
            end
            # turns out you can delete from a set while itereating
            remove_client(client)
          end
        end
      end
    end

    if around_filter
      user_ids = subscription.map do |s|
        c = @clients[s]
        c && c.user_id
      end.compact

      if user_ids && user_ids.length > 0
        around_filter.call(msg, user_ids, work)
      end
    else
      work.call
    end

  rescue => e
    MessageBus.logger.error "notify clients crash #{e} : #{e.backtrace}"
  end
end

#remove_client(c) ⇒ Object



86
87
88
89
90
91
92
# File 'lib/message_bus/connection_manager.rb', line 86

def remove_client(c)
  @clients.delete c.client_id
  @subscriptions[c.site_id].each do |k, set|
    set.delete c.client_id
  end
  c.cleanup_timer.cancel if c.cleanup_timer
end

#statsObject



111
112
113
114
115
116
# File 'lib/message_bus/connection_manager.rb', line 111

def stats
  {
    client_count: @clients.length,
    subscriptions: @subscriptions
  }
end

#subscribe_client(client, channel) ⇒ Object



98
99
100
101
102
103
104
105
# File 'lib/message_bus/connection_manager.rb', line 98

def subscribe_client(client,channel)
  set = @subscriptions[client.site_id][channel]
  unless set
    set = SynchronizedSet.new
    @subscriptions[client.site_id][channel] = set
  end
  set << client.client_id
end