Class: Serfx::Connection
- Inherits:
-
Object
- Object
- Serfx::Connection
- Extended by:
- Forwardable
- Includes:
- Commands
- Defined in:
- lib/serfx/connection.rb
Overview
This class wraps the low level msgpack data transformation and tcp communication for the RPC session. methods in this module are used to implement the actual RPC commands available via [Commands]
Constant Summary collapse
- COMMANDS =
{ handshake: [:header], auth: [:header], event: [:header], force_leave: [:header], join: [:header, :body], members: [:header, :body], members_filtered: [:header, :body], tags: [:header], stream: [:header], monitor: [:header], stop: [:header], leave: [:header], query: [:header], respond: [:header], install_key: [:header, :body], use_key: [:header, :body], remove_key: [:header, :body], list_keys: [:header, :body], stats: [:header, :body] }
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
Instance Method Summary collapse
-
#check_rpc_error!(header) ⇒ Object
checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty.
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
-
#read_data ⇒ Hash
read data from tcp socket and pipe it through msgpack unpacker for deserialization.
-
#read_response(command) ⇒ Response
read data from the tcp socket.
-
#request(command, body = nil) ⇒ Response
make an RPC request against the serf agent.
-
#socket ⇒ TCPSocket
creates a tcp socket if does not exist already, against RPC host/port.
-
#tcp_send(command, body = nil) ⇒ Integer
takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp.
-
#unpacker ⇒ MessagePack::Unpacker
creates a MsgPack un-packer object from the tcp socket unless its already present.
Methods included from Commands
#auth, #event, #force_leave, #handshake, #install_key, #join, #leave, #list_keys, #members, #members_filtered, #monitor, #query, #remove_key, #respond, #stats, #stop, #stream, #tags, #use_key
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
49 50 51 52 53 54 55 56 |
# File 'lib/serfx/connection.rb', line 49 def initialize(opts = {}) @host = opts[:host] || '127.0.0.1' @port = opts[:port] || 7373 @seq = 0 @authkey = opts[:authkey] @requests = {} @responses = {} end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
42 43 44 |
# File 'lib/serfx/connection.rb', line 42 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
42 43 44 |
# File 'lib/serfx/connection.rb', line 42 def port @port end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
42 43 44 |
# File 'lib/serfx/connection.rb', line 42 def seq @seq end |
Instance Method Details
#check_rpc_error!(header) ⇒ Object
checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty
108 109 110 |
# File 'lib/serfx/connection.rb', line 108 def check_rpc_error!(header) fail RPCError, header['Error'] unless header['Error'].empty? end |
#read_data ⇒ Hash
read data from tcp socket and pipe it through msgpack unpacker for deserialization
76 77 78 |
# File 'lib/serfx/connection.rb', line 76 def read_data unpacker.read end |
#read_response(command) ⇒ Response
read data from the tcp socket. and convert it to a [Response] object
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/serfx/connection.rb', line 116 def read_response(command) header = read_data check_rpc_error!(header) if COMMANDS[command].include?(:body) body = read_data Response.new(header, body) else Response.new(header) end end |
#request(command, body = nil) ⇒ Response
make an RPC request against the serf agent
132 133 134 135 |
# File 'lib/serfx/connection.rb', line 132 def request(command, body = nil) tcp_send(command, body) read_response(command) end |
#socket ⇒ TCPSocket
creates a tcp socket if does not exist already, against RPC host/port
61 62 63 |
# File 'lib/serfx/connection.rb', line 61 def socket @socket ||= TCPSocket.new(host, port) end |
#tcp_send(command, body = nil) ⇒ Integer
takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/serfx/connection.rb', line 88 def tcp_send(command, body = nil) @seq += 1 header = { 'Command' => command.to_s.gsub('_', '-'), 'Seq' => seq } Log.info("#{__method__}|Header: #{header.inspect}") buff = MessagePack::Buffer.new buff << header.to_msgpack buff << body.to_msgpack unless body.nil? res = socket.send(buff.to_str, 0) Log.info("#{__method__}|Res: #{res.inspect}") @requests[seq] = { header: header, ack?: false } seq end |
#unpacker ⇒ MessagePack::Unpacker
creates a MsgPack un-packer object from the tcp socket unless its already present
69 70 71 |
# File 'lib/serfx/connection.rb', line 69 def unpacker @unpacker ||= MessagePack::Unpacker.new(socket) end |