Class: AnyCable::RackServer::Hub

Inherits:
Object
  • Object
show all
Defined in:
lib/anycable/rack-server/hub.rb

Overview

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHub



9
10
11
12
13
14
15
# File 'lib/anycable/rack-server/hub.rb', line 9

def initialize
  @streams = Hash.new do |streams, stream_id|
    streams[stream_id] = Hash.new { |channels, channel_id| channels[channel_id] = Set.new }
  end
  @sockets = Hash.new { |h, k| h[k] = Set.new }
  @sync = Mutex.new
end

Instance Attribute Details

#socketsObject (readonly)

Returns the value of attribute sockets.



7
8
9
# File 'lib/anycable/rack-server/hub.rb', line 7

def sockets
  @sockets
end

#streamsObject (readonly)

Returns the value of attribute streams.



7
8
9
# File 'lib/anycable/rack-server/hub.rb', line 7

def streams
  @streams
end

Instance Method Details

#add_subscriber(stream, socket, channel) ⇒ Object



17
18
19
20
21
22
# File 'lib/anycable/rack-server/hub.rb', line 17

def add_subscriber(stream, socket, channel)
  @sync.synchronize do
    @streams[stream][channel] << socket
    @sockets[socket] << [channel, stream]
  end
end

#broadcast(stream, message, coder) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/anycable/rack-server/hub.rb', line 56

def broadcast(stream, message, coder)
  list = @sync.synchronize do
    return unless @streams.key?(stream)

    @streams[stream].to_a
  end

  list.each do |(channel_id, sockets)|
    decoded = coder.decode(message)
    cmessage = channel_message(channel_id, decoded, coder)
    sockets.each { |socket| socket.transmit(cmessage) }
  end
end

#remove_channel(socket, channel) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/anycable/rack-server/hub.rb', line 32

def remove_channel(socket, channel)
  list = @sync.synchronize do
    return unless @sockets.key?(socket)

    @sockets[socket].dup
  end

  list.each do |(channel_id, stream)|
    remove_subscriber(stream, socket, channel) if channel == channel_id
  end
end

#remove_socket(socket) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/anycable/rack-server/hub.rb', line 44

def remove_socket(socket)
  list = @sync.synchronize do
    return unless @sockets.key?(socket)

    @sockets[socket].dup
  end

  list.each do |(channel_id, stream)|
    remove_subscriber(stream, socket, channel_id)
  end
end

#remove_subscriber(stream, socket, channel) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/anycable/rack-server/hub.rb', line 24

def remove_subscriber(stream, socket, channel)
  @sync.synchronize do
    @streams[stream][channel].delete(socket)
    @sockets[socket].delete([channel, stream])
    cleanup stream, socket, channel
  end
end