Class: AnyCable::RackServer::Hub
- Inherits:
-
Object
- Object
- AnyCable::RackServer::Hub
- Defined in:
- lib/anycable/rack-server/hub.rb
Overview
Instance Attribute Summary collapse
-
#sockets ⇒ Object
readonly
Returns the value of attribute sockets.
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
Instance Method Summary collapse
- #add_subscriber(stream, socket, channel) ⇒ Object
- #broadcast(stream, message, coder) ⇒ Object
-
#initialize ⇒ Hub
constructor
A new instance of Hub.
- #remove_channel(socket, channel) ⇒ Object
- #remove_socket(socket) ⇒ Object
- #remove_subscriber(stream, socket, channel) ⇒ Object
Constructor Details
#initialize ⇒ Hub
9 10 11 12 13 14 15 |
# File 'lib/anycable/rack-server/hub.rb', line 9 def initialize @streams = Hash.new do |streams, stream_id| streams[stream_id] = Hash.new { |channels, channel_id| channels[channel_id] = Set.new } end @sockets = Hash.new { |h, k| h[k] = Set.new } @sync = Mutex.new end |
Instance Attribute Details
#sockets ⇒ Object (readonly)
Returns the value of attribute sockets.
7 8 9 |
# File 'lib/anycable/rack-server/hub.rb', line 7 def sockets @sockets end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
7 8 9 |
# File 'lib/anycable/rack-server/hub.rb', line 7 def streams @streams end |
Instance Method Details
#add_subscriber(stream, socket, channel) ⇒ Object
17 18 19 20 21 22 |
# File 'lib/anycable/rack-server/hub.rb', line 17 def add_subscriber(stream, socket, channel) @sync.synchronize do @streams[stream][channel] << socket @sockets[socket] << [channel, stream] end end |
#broadcast(stream, message, coder) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/anycable/rack-server/hub.rb', line 56 def broadcast(stream, , coder) list = @sync.synchronize do return unless @streams.key?(stream) @streams[stream].to_a end list.each do |(channel_id, sockets)| decoded = coder.decode() = (channel_id, decoded, coder) sockets.each { |socket| socket.transmit() } end end |
#remove_channel(socket, channel) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/anycable/rack-server/hub.rb', line 32 def remove_channel(socket, channel) list = @sync.synchronize do return unless @sockets.key?(socket) @sockets[socket].dup end list.each do |(channel_id, stream)| remove_subscriber(stream, socket, channel) if channel == channel_id end end |
#remove_socket(socket) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/anycable/rack-server/hub.rb', line 44 def remove_socket(socket) list = @sync.synchronize do return unless @sockets.key?(socket) @sockets[socket].dup end list.each do |(channel_id, stream)| remove_subscriber(stream, socket, channel_id) end end |
#remove_subscriber(stream, socket, channel) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/anycable/rack-server/hub.rb', line 24 def remove_subscriber(stream, socket, channel) @sync.synchronize do @streams[stream][channel].delete(socket) @sockets[socket].delete([channel, stream]) cleanup stream, socket, channel end end |