Class: Marvin::Distributed::Protocol
- Inherits:
-
EventMachine::Protocols::LineAndTextProtocol
- Object
- EventMachine::Protocols::LineAndTextProtocol
- Marvin::Distributed::Protocol
show all
- Defined in:
- lib/marvin/distributed/protocol.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Instance Attribute Details
#callbacks ⇒ Object
Returns the value of attribute callbacks.
9
10
11
|
# File 'lib/marvin/distributed/protocol.rb', line 9
def callbacks
@callbacks
end
|
Instance Method Details
#handle_enable_ssl(opts = {}) ⇒ Object
57
58
59
60
|
# File 'lib/marvin/distributed/protocol.rb', line 57
def handle_enable_ssl(opts = {})
send_message_reply(:enabled_ssl)
enable_ssl
end
|
#handle_enabled_ssl(opts = {}) ⇒ Object
62
63
64
|
# File 'lib/marvin/distributed/protocol.rb', line 62
def handle_enabled_ssl(opts = {})
enable_ssl
end
|
#handle_noop(opts = {}) ⇒ Object
66
67
68
69
|
# File 'lib/marvin/distributed/protocol.rb', line 66
def handle_noop(opts = {})
logger.debug "no-op"
end
|
#handle_response(response) ⇒ Object
40
41
42
43
44
45
46
47
48
|
# File 'lib/marvin/distributed/protocol.rb', line 40
def handle_response(response)
logger.debug "Handling response in distributed protocol (response: #{response.inspect})"
return unless response.is_a?(Hash) && response.has_key?("message")
options = response["options"] || {}
@callback_id = response.delete("callback-id")
process_callback(options)
process_response_message(response["message"], options)
@callback_id = nil
end
|
#host_with_port ⇒ Object
50
51
52
53
54
55
|
# File 'lib/marvin/distributed/protocol.rb', line 50
def host_with_port
@host_with_port ||= begin
port, ip = Socket.unpack_sockaddr_in(get_peername)
"#{ip}:#{port}"
end
end
|
#post_connect ⇒ Object
After the connection is made and / or ssl is enabled.
72
73
|
# File 'lib/marvin/distributed/protocol.rb', line 72
def post_connect
end
|
#receive_line(line) ⇒ Object
11
12
13
14
15
16
17
18
19
20
|
# File 'lib/marvin/distributed/protocol.rb', line 11
def receive_line(line)
line.strip!
logger.debug "<< #{line}"
response = JSON.parse(line)
handle_response(response)
rescue JSON::ParserError
logger.debug "JSON parsing error for #{line.inspect}"
rescue Exception => e
Marvin::ExceptionTracker.log(e)
end
|
#send_message(name, arguments = {}, &callback) ⇒ Object
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/marvin/distributed/protocol.rb', line 22
def send_message(name, arguments = {}, &callback)
logger.debug "Sending #{name.inspect} to #{self.host_with_port}"
payload = {
"message" => name.to_s,
"options" => arguments,
"sent-at" => Time.now
}
payload.merge!(options_for_callback(callback))
payload = JSON.dump(payload)
logger.debug ">> #{payload}"
send_data "#{payload}\n"
end
|
#send_message_reply(name, arguments = {}) ⇒ Object
35
36
37
38
|
# File 'lib/marvin/distributed/protocol.rb', line 35
def send_message_reply(name, arguments = {})
arguments["callback-id"] = @callback_id if @callback_id.present?
send_message(name, arguments)
end
|
#ssl_handshake_completed ⇒ Object
75
76
77
78
79
80
81
|
# File 'lib/marvin/distributed/protocol.rb', line 75
def ssl_handshake_completed
logger.debug "SSL Handshake completed"
if !connected?
@connected = true
post_connect
end
end
|