Class: Serfx::Connection
- Inherits:
-
Object
- Object
- Serfx::Connection
- Extended by:
- Forwardable
- Includes:
- Commands
- Defined in:
- lib/serfx/connection.rb
Overview
This class wraps the low level msgpack data transformation and tcp communication for the RPC session. methods in this module are used to implement the actual RPC commands available via [Commands]
Constant Summary collapse
- COMMANDS =
{ handshake: [:header], auth: [:header], event: [:header], force_leave: [:header], join: [:header, :body], members: [:header, :body], members_filtered: [:header, :body], tags: [:header], stream: [:header], monitor: [:header], stop: [:header], leave: [:header], query: [:header], respond: [:header] }
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
Instance Method Summary collapse
-
#check_rpc_error!(header) ⇒ Object
checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty.
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
-
#read_data ⇒ Hash
read data from tcp socket and pipe it through msgpack unpacker for deserialization.
-
#read_response(command) ⇒ Response
read data from the tcp socket.
-
#request(command, body = nil) ⇒ Response
make an RPC request against the serf agent.
-
#socket ⇒ TCPSocket
creates a tcp socket if does not exist already, against RPC host/port.
-
#tcp_send(command, body = nil) ⇒ Integer
takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp.
-
#unpacker ⇒ MessagePack::Unpacker
creates a MsgPack un-packer object from the tcp socket unless its already present.
Methods included from Commands
#auth, #event, #force_leave, #handshake, #join, #leave, #members, #members_filtered, #monitor, #query, #respond, #stop, #stream, #tags
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
43 44 45 46 47 48 49 50 |
# File 'lib/serfx/connection.rb', line 43 def initialize(opts = {}) @host = opts[:host] || '127.0.0.1' @port = opts[:port] || 7373 @seq = 0 @authkey = opts[:authkey] @requests = {} @responses = {} end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
36 37 38 |
# File 'lib/serfx/connection.rb', line 36 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
36 37 38 |
# File 'lib/serfx/connection.rb', line 36 def port @port end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
36 37 38 |
# File 'lib/serfx/connection.rb', line 36 def seq @seq end |
Instance Method Details
#check_rpc_error!(header) ⇒ Object
checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty
103 104 105 |
# File 'lib/serfx/connection.rb', line 103 def check_rpc_error!(header) fail RPCError, header['Error'] unless header['Error'].empty? end |
#read_data ⇒ Hash
read data from tcp socket and pipe it through msgpack unpacker for deserialization
71 72 73 |
# File 'lib/serfx/connection.rb', line 71 def read_data unpacker.read end |
#read_response(command) ⇒ Response
read data from the tcp socket. and convert it to a [Response] object
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/serfx/connection.rb', line 112 def read_response(command) header = read_data check_rpc_error!(header) if COMMANDS[command].include?(:body) body = read_data Response.new(header, body) else Response.new(header) end end |
#request(command, body = nil) ⇒ Response
make an RPC request against the serf agent
128 129 130 131 |
# File 'lib/serfx/connection.rb', line 128 def request(command, body = nil) tcp_send(command, body) read_response(command) end |
#socket ⇒ TCPSocket
creates a tcp socket if does not exist already, against RPC host/port
55 56 57 |
# File 'lib/serfx/connection.rb', line 55 def socket @socket ||= TCPSocket.new(host, port) end |
#tcp_send(command, body = nil) ⇒ Integer
takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/serfx/connection.rb', line 83 def tcp_send(command, body = nil) @seq += 1 header = { 'Command' => command.to_s.gsub('_', '-'), 'Seq' => seq } Log.info("#{__method__}|Header: #{header.inspect}") buff = MessagePack::Buffer.new buff << header.to_msgpack buff << body.to_msgpack unless body.nil? res = socket.send(buff.to_str, 0) Log.info("#{__method__}|Res: #{res.inspect}") @requests[seq] = { header: header, ack?: false } seq end |
#unpacker ⇒ MessagePack::Unpacker
creates a MsgPack un-packer object from the tcp socket unless its already present
63 64 65 |
# File 'lib/serfx/connection.rb', line 63 def unpacker @unpacker ||= MessagePack::Unpacker.new(socket) end |