Class: Slack::RealTime::Concurrency::Celluloid::Socket
- Inherits:
-
Socket
- Object
- Socket
- Slack::RealTime::Concurrency::Celluloid::Socket
show all
- Extended by:
- Forwardable
- Includes:
- Celluloid::IO, Celluloid::Internals::Logger
- Defined in:
- lib/slack/real_time/concurrency/celluloid.rb
Defined Under Namespace
Classes: Actor
Constant Summary
collapse
- BLOCK_SIZE =
4096
Instance Attribute Summary collapse
Attributes inherited from Socket
#driver, #options, #url
Instance Method Summary
collapse
Methods inherited from Socket
#current_time, #send_data, #start_sync, #time_since_last_message
Constructor Details
#initialize(*args) ⇒ Socket
Returns a new instance of Socket.
23
24
25
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 23
def initialize(*args)
super
end
|
Instance Attribute Details
#socket ⇒ Object
Returns the value of attribute socket.
21
22
23
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 21
def socket
@socket
end
|
Instance Method Details
#close ⇒ Object
53
54
55
56
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 53
def close
@closing = true
super
end
|
#connect! ⇒ Object
27
28
29
30
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 27
def connect!
super
run_loop
end
|
#connected? ⇒ Boolean
103
104
105
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 103
def connected?
!@connected.nil? && !@driver.nil?
end
|
#disconnect! ⇒ Object
48
49
50
51
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 48
def disconnect!
super
@ping_timer&.cancel
end
|
#handle_read(buffer) ⇒ Object
65
66
67
68
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 65
def handle_read(buffer)
logger.debug("#{self.class}##{__method__}") { buffer }
driver&.parse buffer
end
|
#read ⇒ Object
58
59
60
61
62
63
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 58
def read
buffer = socket.readpartial(BLOCK_SIZE)
raise EOFError unless buffer && !buffer.empty?
async.handle_read(buffer)
end
|
#restart_async(client, new_url) ⇒ Object
96
97
98
99
100
101
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 96
def restart_async(client, new_url)
@last_message_at = current_time
@url = new_url
@client = client
Actor.new(future.run_client_loop)
end
|
#run_client_loop ⇒ Object
81
82
83
84
85
86
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 81
def run_client_loop
@client.run_loop
rescue StandardError => e
logger.debug("#{self.class}##{__method__}") { e }
raise e
end
|
#run_loop ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 32
def run_loop
@closing = false
@socket = build_socket
@connected = @socket.connect
driver.start
loop { read } if socket
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => e
logger.debug("#{self.class}##{__method__}") { e }
unless @closing
driver.emit(
:close,
WebSocket::Driver::CloseEvent.new(1001, 'server closed connection')
)
end
end
|
#run_ping_loop ⇒ Object
88
89
90
91
92
93
94
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 88
def run_ping_loop
return unless @client.run_ping?
@ping_timer = every @client.websocket_ping_timer do
@client.run_ping!
end
end
|
#start_async(client) ⇒ Object
75
76
77
78
79
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 75
def start_async(client)
@client = client
Actor.new(future.run_ping_loop)
Actor.new(future.run_client_loop)
end
|
#write(data) ⇒ Object
70
71
72
73
|
# File 'lib/slack/real_time/concurrency/celluloid.rb', line 70
def write(data)
logger.debug("#{self.class}##{__method__}") { data }
socket.write(data)
end
|