Class: Serfx::Connection

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Commands
Defined in:
lib/serfx/connection.rb

Overview

This class wraps the low level msgpack data transformation and tcp communication for the RPC session. methods in this module are used to implement the actual RPC commands available via [Commands]

Constant Summary collapse

COMMANDS =
{
handshake:        [:header],
auth:             [:header],
event:            [:header],
force_leave:      [:header],
join:             [:header, :body],
members:          [:header, :body],
members_filtered: [:header, :body],
tags:             [:header],
stream:           [:header],
monitor:          [:header],
stop:             [:header],
leave:            [:header],
query:            [:header],
respond:          [:header]
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Commands

#auth, #event, #force_leave, #handshake, #join, #leave, #members, #members_filtered, #monitor, #query, #respond, #stop, #stream, #tags

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.

Options Hash (opts):

  • :host (Symbol)

    ipaddreess of the target serf agent

  • :port (Symbol)

    port of target serf agents RPC

  • :authkey (Symbol)

    encryption key for RPC communication



43
44
45
46
47
48
49
50
# File 'lib/serfx/connection.rb', line 43

def initialize(opts = {})
  @host = opts[:host] || '127.0.0.1'
  @port = opts[:port] || 7373
  @seq = 0
  @authkey = opts[:authkey]
  @requests = {}
  @responses = {}
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



36
37
38
# File 'lib/serfx/connection.rb', line 36

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



36
37
38
# File 'lib/serfx/connection.rb', line 36

def port
  @port
end

#seqObject (readonly)

Returns the value of attribute seq.



36
37
38
# File 'lib/serfx/connection.rb', line 36

def seq
  @seq
end

Instance Method Details

#check_rpc_error!(header) ⇒ Object

checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty



103
104
105
# File 'lib/serfx/connection.rb', line 103

def check_rpc_error!(header)
  fail RPCError, header['Error'] unless header['Error'].empty?
end

#read_dataHash

read data from tcp socket and pipe it through msgpack unpacker for deserialization



71
72
73
# File 'lib/serfx/connection.rb', line 71

def read_data
  unpacker.read
end

#read_response(command) ⇒ Response

read data from the tcp socket. and convert it to a [Response] object



112
113
114
115
116
117
118
119
120
121
# File 'lib/serfx/connection.rb', line 112

def read_response(command)
  header =  read_data
  check_rpc_error!(header)
  if COMMANDS[command].include?(:body)
    body = read_data
    Response.new(header, body)
  else
    Response.new(header)
  end
end

#request(command, body = nil) ⇒ Response

make an RPC request against the serf agent



128
129
130
131
# File 'lib/serfx/connection.rb', line 128

def request(command, body = nil)
  tcp_send(command, body)
  read_response(command)
end

#socketTCPSocket

creates a tcp socket if does not exist already, against RPC host/port



55
56
57
# File 'lib/serfx/connection.rb', line 55

def socket
  @socket ||= TCPSocket.new(host, port)
end

#tcp_send(command, body = nil) ⇒ Integer

takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/serfx/connection.rb', line 83

def tcp_send(command, body = nil)
  @seq += 1
  header = {
    'Command' => command.to_s.gsub('_', '-'),
    'Seq' => seq
    }
  Log.info("#{__method__}|Header: #{header.inspect}")
  buff = MessagePack::Buffer.new
  buff << header.to_msgpack
  buff << body.to_msgpack unless body.nil?
  res = socket.send(buff.to_str, 0)
  Log.info("#{__method__}|Res: #{res.inspect}")
  @requests[seq] = { header: header, ack?: false }
  seq
end

#unpackerMessagePack::Unpacker

creates a MsgPack un-packer object from the tcp socket unless its already present



63
64
65
# File 'lib/serfx/connection.rb', line 63

def unpacker
  @unpacker ||= MessagePack::Unpacker.new(socket)
end