Class: Pakyow::Realtime::Server
- Inherits:
-
Object
- Object
- Pakyow::Realtime::Server
- Defined in:
- lib/pakyow/realtime/server.rb,
lib/pakyow/realtime/server/adapters/redis.rb,
lib/pakyow/realtime/server/adapters/memory.rb
Defined Under Namespace
Modules: Adapters
Constant Summary collapse
- HEARTBEAT_INTERVAL =
3
Instance Attribute Summary collapse
-
#adapter ⇒ Object
readonly
Returns the value of attribute adapter.
Instance Method Summary collapse
- #connect ⇒ Object
- #disconnect ⇒ Object
- #find_socket(id_or_socket) {|socket| ... } ⇒ Object
- #find_socket_by_id(socket_id) ⇒ Object
- #find_socket_id(id_or_socket) {|socket_id| ... } ⇒ Object
-
#initialize(adapter = :memory, adapter_config, timeout_config) ⇒ Server
constructor
A new instance of Server.
- #shutdown ⇒ Object
- #socket_connect(id_or_socket) ⇒ Object
- #socket_disconnect(id_or_socket) ⇒ Object
- #socket_subscribe(id_or_socket, *channels) ⇒ Object
- #socket_unsubscribe(*channels) ⇒ Object
- #subscription_broadcast(channel, message) ⇒ Object
-
#transmit_message_to_connection_ids(message, socket_ids, raw: false) ⇒ Object
Called by the adapter, which guarantees that this server has connections for these ids.
Constructor Details
#initialize(adapter = :memory, adapter_config, timeout_config) ⇒ Server
Returns a new instance of Server.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/pakyow/realtime/server.rb', line 18 def initialize(adapter = :memory, adapter_config, timeout_config) require "pakyow/realtime/server/adapters/#{adapter}" @adapter = Adapters.const_get(adapter.to_s.capitalize).new(self, adapter_config) @sockets = Concurrent::Array.new @timeout_config = timeout_config @executor = Concurrent::ThreadPoolExecutor.new( auto_terminate: false, min_threads: 1, max_threads: 10, max_queue: 0 ) connect rescue LoadError => e Pakyow.logger.error "Failed to load data subscriber store adapter named `#{adapter}'" Pakyow.logger.error e. end |
Instance Attribute Details
#adapter ⇒ Object (readonly)
Returns the value of attribute adapter.
14 15 16 |
# File 'lib/pakyow/realtime/server.rb', line 14 def adapter @adapter end |
Instance Method Details
#connect ⇒ Object
43 44 45 46 47 |
# File 'lib/pakyow/realtime/server.rb', line 43 def connect @executor << -> { start_heartbeat; @adapter.connect } end |
#disconnect ⇒ Object
49 50 51 52 53 |
# File 'lib/pakyow/realtime/server.rb', line 49 def disconnect @executor << -> { stop_heartbeat; @adapter.disconnect } end |
#find_socket(id_or_socket) {|socket| ... } ⇒ Object
116 117 118 119 120 121 122 123 124 |
# File 'lib/pakyow/realtime/server.rb', line 116 def find_socket(id_or_socket) socket = if id_or_socket.is_a?(WebSocket) id_or_socket else find_socket_by_id(id_or_socket) end yield socket if socket end |
#find_socket_by_id(socket_id) ⇒ Object
112 113 114 |
# File 'lib/pakyow/realtime/server.rb', line 112 def find_socket_by_id(socket_id) @sockets.find { |socket| socket.id == socket_id } end |
#find_socket_id(id_or_socket) {|socket_id| ... } ⇒ Object
126 127 128 129 130 131 132 133 134 |
# File 'lib/pakyow/realtime/server.rb', line 126 def find_socket_id(id_or_socket) socket_id = if id_or_socket.is_a?(WebSocket) id_or_socket.id else id_or_socket end yield socket_id if socket_id end |
#shutdown ⇒ Object
36 37 38 39 40 41 |
# File 'lib/pakyow/realtime/server.rb', line 36 def shutdown disconnect @sockets.each(&:shutdown) @executor.shutdown @executor.wait_for_termination(30) end |
#socket_connect(id_or_socket) ⇒ Object
55 56 57 58 59 60 61 62 63 |
# File 'lib/pakyow/realtime/server.rb', line 55 def socket_connect(id_or_socket) @executor << -> { find_socket(id_or_socket) do |socket| @sockets << socket @adapter.persist(socket.id) @adapter.current!(socket.id, socket.object_id) end } end |
#socket_disconnect(id_or_socket) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pakyow/realtime/server.rb', line 65 def socket_disconnect(id_or_socket) @executor << -> { find_socket(id_or_socket) do |socket| @sockets.delete(socket) # If this isn't the current instance for the socket id, it means that a # reconnect probably happened and the new socket connected before we # knew that the old one disconnected. Since there's a newer socket, # don't trigger leave events or expirations for the old one. # if @adapter.current?(socket.id, socket.object_id) socket.leave @adapter.expire(socket.id, @timeout_config.disconnect) end end } end |
#socket_subscribe(id_or_socket, *channels) ⇒ Object
83 84 85 86 87 88 89 90 |
# File 'lib/pakyow/realtime/server.rb', line 83 def socket_subscribe(id_or_socket, *channels) @executor << -> { find_socket_id(id_or_socket) do |socket_id| @adapter.socket_subscribe(socket_id, *channels) @adapter.expire(socket_id, @timeout_config.initial) end } end |
#socket_unsubscribe(*channels) ⇒ Object
92 93 94 95 96 |
# File 'lib/pakyow/realtime/server.rb', line 92 def socket_unsubscribe(*channels) @executor << -> { @adapter.socket_unsubscribe(*channels) } end |
#subscription_broadcast(channel, message) ⇒ Object
98 99 100 101 102 |
# File 'lib/pakyow/realtime/server.rb', line 98 def subscription_broadcast(channel, ) @executor << -> { @adapter.subscription_broadcast(channel.to_s, channel: channel.name, message: ) } end |
#transmit_message_to_connection_ids(message, socket_ids, raw: false) ⇒ Object
Called by the adapter, which guarantees that this server has connections for these ids.
106 107 108 109 110 |
# File 'lib/pakyow/realtime/server.rb', line 106 def (, socket_ids, raw: false) socket_ids.each do |socket_id| find_socket_by_id(socket_id)&.transmit(, raw: raw) end end |