Class: AmfSocket::AmfRpcConnection

Inherits:
AmfConnection
  • Object
show all
Defined in:
lib/amf_socket/amf_rpc_connection.rb

Constant Summary collapse

PING_INTERVAL =

Seconds.

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from AmfConnection

#serializer

Instance Attribute Details

#latencyObject (readonly)

Returns the value of attribute latency.



4
5
6
# File 'lib/amf_socket/amf_rpc_connection.rb', line 4

def latency
  @latency
end

Instance Method Details

#heartbeatObject



94
95
96
97
# File 'lib/amf_socket/amf_rpc_connection.rb', line 94

def heartbeat
  timeout_requests
  ping
end

#post_initObject

EM Callbacks.



24
25
26
27
28
29
30
31
# File 'lib/amf_socket/amf_rpc_connection.rb', line 24

def post_init
  super

  @sent_requests = {}
  @next_ping = Time.now.utc

  AmfSocket.heartbeat.add(self)
end

#receive_message(message) ⇒ Object

Override this method in your subclass.



76
77
# File 'lib/amf_socket/amf_rpc_connection.rb', line 76

def receive_message(message)
end

#receive_object(object) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/amf_socket/amf_rpc_connection.rb', line 33

def receive_object(object)
  begin
    raise AmfSocket::InvalidObject unless object.is_a?(Hash) && object[:type]

    case object[:type]
    when 'rpcRequest'
      request = AmfSocket::RpcReceivedRequest.new(object, self)
      receive_request(request)
    when 'rpcResponse'
      receive_response(object)
    when 'rpcMessage'
      message = AmfSocket::RpcReceivedMessage.new(object, self)
      receive_message(message)
    else
      raise AmfSocket::InvalidObject
    end
  rescue AmfSocket::InvalidObject
    close_connection
  end
end

#receive_request(request) ⇒ Object

Override this method in your subclass.



71
72
73
# File 'lib/amf_socket/amf_rpc_connection.rb', line 71

def receive_request(request)
  request.reply("You should override #{self.class.to_s}#receive_request.")
end

#receive_response(response_object) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/amf_socket/amf_rpc_connection.rb', line 79

def receive_response(response_object)
  raise AmfSocket::InvalidObject unless (message_id = response_object[:response][:messageId])

  return unless (request = @sent_requests[message_id]) # Ignore timed out requests.

  response = AmfSocket::RpcResponse.new(request, response_object)
  @sent_requests.delete(message_id)

  AmfSocket.try do
    if request.succeeded_callback.is_a?(Proc)
      request.succeeded_callback.call(response)
    end
  end
end

#send_message(command, params = {}) ⇒ Object



14
15
16
17
18
# File 'lib/amf_socket/amf_rpc_connection.rb', line 14

def send_message(command, params = {})
  message = AmfSocket::RpcMessage.new(command, params)
  send_object(message.to_hash)
  message.send(:sent)
end

#send_request(command, params = {}, &block) ⇒ Object



6
7
8
9
10
11
12
# File 'lib/amf_socket/amf_rpc_connection.rb', line 6

def send_request(command, params = {}, &block)
  request = AmfSocket::RpcRequest.new(command, params)
  block.call(request)
  @sent_requests[request.message_id] = request
  send_object(request.to_hash)
  request.send(:sent)
end

#unbindObject



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/amf_socket/amf_rpc_connection.rb', line 54

def unbind
  AmfSocket.heartbeat.remove(self)

  @sent_requests.each do |message_id, request|
    if request.failed_callback.is_a?(Proc)
      request.failed_callback.call('disconnected')
    end
  end

  @sent_requests.clear
end