Class: Serfx::Connection
- Inherits:
-
Object
- Object
- Serfx::Connection
- Extended by:
- Forwardable
- Includes:
- Commands
- Defined in:
- lib/serfx/connection.rb
Overview
This class wraps the low level msgpack data transformation and tcp communication for the RPC session. methods in this module are used to implement the actual RPC commands available via [Commands]
Constant Summary collapse
- COMMANDS =
{ handshake: [:header], auth: [:header], event: [:header], force_leave: [:header], join: [:header, :body], members: [:header, :body], members_filtered: [:header, :body], tags: [:header], stream: [:header], monitor: [:header], stop: [:header], leave: [:header], query: [:header], respond: [:header], install_key: [:header, :body], use_key: [:header, :body], remove_key: [:header, :body], list_keys: [:header, :body], stats: [:header, :body], get_coordinate: [:header, :body] }
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
Instance Method Summary collapse
-
#check_rpc_error!(header) ⇒ Object
checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty.
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
-
#read_data ⇒ Hash
read data from tcp socket and pipe it through msgpack unpacker for deserialization.
-
#read_response(command) ⇒ Response
read data from the tcp socket.
-
#request(command, body = nil) ⇒ Response
make an RPC request against the serf agent.
-
#socket ⇒ TCPSocket
creates a tcp socket if does not exist already, against RPC host/port.
-
#tcp_send(command, body = nil) ⇒ Integer
takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp.
-
#unpacker ⇒ MessagePack::Unpacker
creates a MsgPack un-packer object from the tcp socket unless its already present.
Methods included from Commands
#auth, #event, #force_leave, #get_coordinate, #handshake, #install_key, #join, #leave, #list_keys, #members, #members_filtered, #monitor, #query, #remove_key, #respond, #stats, #stop, #stream, #tags, #use_key
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
50 51 52 53 54 55 56 57 |
# File 'lib/serfx/connection.rb', line 50 def initialize(opts = {}) @host = opts[:host] || '127.0.0.1' @port = opts[:port] || 7373 @seq = 0 @authkey = opts[:authkey] @requests = {} @responses = {} end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
43 44 45 |
# File 'lib/serfx/connection.rb', line 43 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
43 44 45 |
# File 'lib/serfx/connection.rb', line 43 def port @port end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
43 44 45 |
# File 'lib/serfx/connection.rb', line 43 def seq @seq end |
Instance Method Details
#check_rpc_error!(header) ⇒ Object
checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty
109 110 111 |
# File 'lib/serfx/connection.rb', line 109 def check_rpc_error!(header) fail RPCError, header['Error'] unless header['Error'].nil? || header['Error'].empty? end |
#read_data ⇒ Hash
read data from tcp socket and pipe it through msgpack unpacker for deserialization
77 78 79 |
# File 'lib/serfx/connection.rb', line 77 def read_data unpacker.read end |
#read_response(command) ⇒ Response
read data from the tcp socket. and convert it to a [Response] object
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/serfx/connection.rb', line 117 def read_response(command) header = read_data check_rpc_error!(header) if COMMANDS[command].include?(:body) body = read_data Response.new(header, body) else Response.new(header) end end |
#request(command, body = nil) ⇒ Response
make an RPC request against the serf agent
133 134 135 136 |
# File 'lib/serfx/connection.rb', line 133 def request(command, body = nil) tcp_send(command, body) read_response(command) end |
#socket ⇒ TCPSocket
creates a tcp socket if does not exist already, against RPC host/port
62 63 64 |
# File 'lib/serfx/connection.rb', line 62 def socket @socket ||= TCPSocket.new(host, port) end |
#tcp_send(command, body = nil) ⇒ Integer
takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/serfx/connection.rb', line 89 def tcp_send(command, body = nil) @seq += 1 header = { 'Command' => command.to_s.gsub('_', '-'), 'Seq' => seq } Log.info("#{__method__}|Header: #{header.inspect}") buff = MessagePack::Buffer.new buff << header.to_msgpack buff << body.to_msgpack unless body.nil? res = socket.send(buff.to_str, 0) Log.info("#{__method__}|Res: #{res.inspect}") @requests[seq] = { header: header, ack?: false } seq end |
#unpacker ⇒ MessagePack::Unpacker
creates a MsgPack un-packer object from the tcp socket unless its already present
70 71 72 |
# File 'lib/serfx/connection.rb', line 70 def unpacker @unpacker ||= MessagePack::Unpacker.new(socket) end |