Class: AnyCable::Rack::Hub

Inherits:
Object
  • Object
show all
Defined in:
lib/anycable/rack/hub.rb

Overview

Constant Summary collapse

INTERNAL_STREAM =
:__internal__

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHub

Returns a new instance of Hub.



13
14
15
16
17
18
19
# File 'lib/anycable/rack/hub.rb', line 13

def initialize
  @streams = Hash.new do |streams, stream_id|
    streams[stream_id] = Hash.new { |channels, channel_id| channels[channel_id] = Set.new }
  end
  @sockets = Hash.new { |h, k| h[k] = Set.new }
  @sync = Mutex.new
end

Instance Attribute Details

#socketsObject (readonly)

Returns the value of attribute sockets.



11
12
13
# File 'lib/anycable/rack/hub.rb', line 11

def sockets
  @sockets
end

#streamsObject (readonly)

Returns the value of attribute streams.



11
12
13
# File 'lib/anycable/rack/hub.rb', line 11

def streams
  @streams
end

Instance Method Details

#add_socket(socket, identifier) ⇒ Object



21
22
23
24
25
# File 'lib/anycable/rack/hub.rb', line 21

def add_socket(socket, identifier)
  @sync.synchronize do
    @streams[INTERNAL_STREAM][identifier] << socket
  end
end

#add_subscriber(stream, socket, channel) ⇒ Object



27
28
29
30
31
32
# File 'lib/anycable/rack/hub.rb', line 27

def add_subscriber(stream, socket, channel)
  @sync.synchronize do
    @streams[stream][channel] << socket
    @sockets[socket] << [channel, stream]
  end
end

#broadcast(stream, message, coder) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/anycable/rack/hub.rb', line 66

def broadcast(stream, message, coder)
  list = @sync.synchronize do
    return unless @streams.key?(stream)

    @streams[stream].to_a
  end

  list.each do |(channel_id, sockets)|
    decoded = coder.decode(message)
    cmessage = channel_message(channel_id, decoded, coder)
    sockets.each { |socket| socket.transmit(cmessage) }
  end
end

#broadcast_all(message) ⇒ Object



80
81
82
# File 'lib/anycable/rack/hub.rb', line 80

def broadcast_all(message)
  sockets.each_key { |socket| socket.transmit(message) }
end

#close_allObject



99
100
101
102
103
104
# File 'lib/anycable/rack/hub.rb', line 99

def close_all
  hub.sockets.dup.each do |socket|
    hub.remove_socket(socket)
    socket.close
  end
end

#disconnect(identifier, reconnect) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/anycable/rack/hub.rb', line 84

def disconnect(identifier, reconnect)
  sockets = @sync.synchronize do
    return unless @streams[INTERNAL_STREAM].key?(identifier)

    @streams[INTERNAL_STREAM][identifier].to_a
  end

  msg = disconnect_message("remote", reconnect)

  sockets.each do |socket|
    socket.transmit(msg)
    socket.close
  end
end

#remove_channel(socket, channel) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/anycable/rack/hub.rb', line 42

def remove_channel(socket, channel)
  list = @sync.synchronize do
    return unless @sockets.key?(socket)

    @sockets[socket].dup
  end

  list.each do |(channel_id, stream)|
    remove_subscriber(stream, socket, channel) if channel == channel_id
  end
end

#remove_socket(socket) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/anycable/rack/hub.rb', line 54

def remove_socket(socket)
  list = @sync.synchronize do
    return unless @sockets.key?(socket)

    @sockets[socket].dup
  end

  list.each do |(channel_id, stream)|
    remove_subscriber(stream, socket, channel_id)
  end
end

#remove_subscriber(stream, socket, channel) ⇒ Object



34
35
36
37
38
39
40
# File 'lib/anycable/rack/hub.rb', line 34

def remove_subscriber(stream, socket, channel)
  @sync.synchronize do
    @streams[stream][channel].delete(socket)
    @sockets[socket].delete([channel, stream])
    cleanup stream, socket, channel
  end
end