Class: Zeromem::Ws::WsHeavy

Inherits:
RJR::Nodes::WS
  • Object
show all
Defined in:
lib/zeromem/ws.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ WsHeavy

WS initializer

Parameters:

  • args (Hash) (defaults to: {})

    the options to create the web socket node with

Options Hash (args):

  • :host (String)

    the hostname/ip which to listen on

  • :port (Integer)

    the port which to listen on



12
13
14
# File 'lib/zeromem/ws.rb', line 12

def initialize(args = {})
  super(args)
end

Instance Method Details

#invoke(uri, rpc_method, *args) ⇒ Object

Instructs node to send rpc request, and wait for / return response

Implementation of Zeromem::Ws::WsHeavy#invoke This is custom implementation of RJR::Node#invoke to make things work perfectly under heavy load (thousands of ws queries per min)

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

Parameters:

  • uri (String)

    location of node to send request to, should be in format of ws://hostname:port

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/zeromem/ws.rb', line 29

def invoke(uri, rpc_method, *args)
  message = RJR::Messages::Request.new :method => rpc_method,
  :args   => args,
  :headers => @message_headers

  @@em.schedule {
    init_client(uri) do |c|
      c.stream { |msg| handle_message(msg.data, c) }

      c.send_msg message.to_s
    end
  }

  # TODO optional timeout for response ?
  # this cause resource leak
  #result = wait_for_result(message)

  result = wait_for_result_custom(message)

  if result.size > 2
    fail result[2]
  end
  return result[1]
end