Class: Marvin::Distributed::Protocol

Inherits:
EventMachine::Protocols::LineAndTextProtocol
  • Object
show all
Defined in:
lib/marvin/distributed/protocol.rb

Direct Known Subclasses

Client::EMConnection, Server

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callbacksObject

Returns the value of attribute callbacks.



9
10
11
# File 'lib/marvin/distributed/protocol.rb', line 9

def callbacks
  @callbacks
end

Instance Method Details

#handle_enable_ssl(opts = {}) ⇒ Object



57
58
59
60
# File 'lib/marvin/distributed/protocol.rb', line 57

def handle_enable_ssl(opts = {})
  send_message_reply(:enabled_ssl)
  enable_ssl
end

#handle_enabled_ssl(opts = {}) ⇒ Object



62
63
64
# File 'lib/marvin/distributed/protocol.rb', line 62

def handle_enabled_ssl(opts = {})
  enable_ssl
end

#handle_noop(opts = {}) ⇒ Object



66
67
68
69
# File 'lib/marvin/distributed/protocol.rb', line 66

def handle_noop(opts = {})
  # DO NOTHING.
  logger.debug "no-op"
end

#handle_response(response) ⇒ Object



40
41
42
43
44
45
46
47
48
# File 'lib/marvin/distributed/protocol.rb', line 40

def handle_response(response)
  logger.debug "Handling response in distributed protocol (response: #{response.inspect})"
  return unless response.is_a?(Hash) && response.has_key?("message")
  options = response["options"] || {}
  @callback_id = response.delete("callback-id")
  process_callback(options)
  process_response_message(response["message"], options)
  @callback_id = nil
end

#host_with_portObject



50
51
52
53
54
55
# File 'lib/marvin/distributed/protocol.rb', line 50

def host_with_port
  @host_with_port ||= begin
    port, ip = Socket.unpack_sockaddr_in(get_peername)
    "#{ip}:#{port}"
  end
end

#post_connectObject

After the connection is made and / or ssl is enabled.



72
73
# File 'lib/marvin/distributed/protocol.rb', line 72

def post_connect
end

#receive_line(line) ⇒ Object



11
12
13
14
15
16
17
18
19
20
# File 'lib/marvin/distributed/protocol.rb', line 11

def receive_line(line)
  line.strip!
  logger.debug "<< #{line}"
  response = JSON.parse(line)
  handle_response(response)
rescue JSON::ParserError
  logger.debug "JSON parsing error for #{line.inspect}"
rescue Exception => e
  Marvin::ExceptionTracker.log(e)
end

#send_message(name, arguments = {}, &callback) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/marvin/distributed/protocol.rb', line 22

def send_message(name, arguments = {}, &callback)
  logger.debug "Sending #{name.inspect} to #{self.host_with_port}"
  payload = {
    "message" => name.to_s,
    "options" => arguments,
    "sent-at" => Time.now
  }
  payload.merge!(options_for_callback(callback))
  payload = JSON.dump(payload)
  logger.debug ">> #{payload}"
  send_data "#{payload}\n"
end

#send_message_reply(name, arguments = {}) ⇒ Object



35
36
37
38
# File 'lib/marvin/distributed/protocol.rb', line 35

def send_message_reply(name, arguments = {})
  arguments["callback-id"] = @callback_id if @callback_id.present?
  send_message(name, arguments)
end

#ssl_handshake_completedObject



75
76
77
78
79
80
81
# File 'lib/marvin/distributed/protocol.rb', line 75

def ssl_handshake_completed
  logger.debug "SSL Handshake completed"
  if !connected?
    @connected = true
    post_connect
  end
end