Class: Serfx::Connection

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Commands
Defined in:
lib/serfx/connection.rb

Overview

This class wraps the low level msgpack data transformation and tcp communication for the RPC session. methods in this module are used to implement the actual RPC commands available via [Commands]

Constant Summary collapse

COMMANDS =
{
  handshake:        [:header],
  auth:             [:header],
  event:            [:header],
  force_leave:      [:header],
  join:             [:header, :body],
  members:          [:header, :body],
  members_filtered: [:header, :body],
  tags:             [:header],
  stream:           [:header],
  monitor:          [:header],
  stop:             [:header],
  leave:            [:header],
  query:            [:header],
  respond:          [:header],
  install_key:      [:header, :body],
  use_key:          [:header, :body],
  remove_key:       [:header, :body],
  list_keys:        [:header, :body],
  stats:            [:header, :body]
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Commands

#auth, #event, #force_leave, #handshake, #install_key, #join, #leave, #list_keys, #members, #members_filtered, #monitor, #query, #remove_key, #respond, #stats, #stop, #stream, #tags, #use_key

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • opts (Hash) (defaults to: {})

    Specify the RPC connection details

Options Hash (opts):

  • :host (Symbol)

    ipaddreess of the target serf agent

  • :port (Symbol)

    port of target serf agents RPC

  • :authkey (Symbol)

    encryption key for RPC communication



49
50
51
52
53
54
55
56
# File 'lib/serfx/connection.rb', line 49

def initialize(opts = {})
  @host = opts[:host] || '127.0.0.1'
  @port = opts[:port] || 7373
  @seq = 0
  @authkey = opts[:authkey]
  @requests = {}
  @responses = {}
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



42
43
44
# File 'lib/serfx/connection.rb', line 42

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



42
43
44
# File 'lib/serfx/connection.rb', line 42

def port
  @port
end

#seqObject (readonly)

Returns the value of attribute seq.



42
43
44
# File 'lib/serfx/connection.rb', line 42

def seq
  @seq
end

Instance Method Details

#check_rpc_error!(header) ⇒ Object

checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty

Parameters:

  • header (Hash)

    RPC response header as hash



108
109
110
# File 'lib/serfx/connection.rb', line 108

def check_rpc_error!(header)
  fail RPCError, header['Error'] unless header['Error'].empty?
end

#read_dataHash

read data from tcp socket and pipe it through msgpack unpacker for deserialization

Returns:

  • (Hash)


76
77
78
# File 'lib/serfx/connection.rb', line 76

def read_data
  unpacker.read
end

#read_response(command) ⇒ Response

read data from the tcp socket. and convert it to a [Response] object

Parameters:

  • command (String)

    RPC command name for which response will be read

Returns:



116
117
118
119
120
121
122
123
124
125
# File 'lib/serfx/connection.rb', line 116

def read_response(command)
  header =  read_data
  check_rpc_error!(header)
  if COMMANDS[command].include?(:body)
    body = read_data
    Response.new(header, body)
  else
    Response.new(header)
  end
end

#request(command, body = nil) ⇒ Response

make an RPC request against the serf agent

Parameters:

  • command (String)

    name of the RPC command

  • body (Hash) (defaults to: nil)

    an optional request body for the RPC command

Returns:



132
133
134
135
# File 'lib/serfx/connection.rb', line 132

def request(command, body = nil)
  tcp_send(command, body)
  read_response(command)
end

#socketTCPSocket

creates a tcp socket if does not exist already, against RPC host/port

Returns:

  • (TCPSocket)


61
62
63
# File 'lib/serfx/connection.rb', line 61

def socket
  @socket ||= TCPSocket.new(host, port)
end

#tcp_send(command, body = nil) ⇒ Integer

takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp

Parameters:

  • command (String)

    RPC command name

  • body (Hash) (defaults to: nil)

    request body of the RPC command

Returns:

  • (Integer)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/serfx/connection.rb', line 88

def tcp_send(command, body = nil)
  @seq += 1
  header = {
    'Command' => command.to_s.gsub('_', '-'),
    'Seq' => seq
  }
  Log.info("#{__method__}|Header: #{header.inspect}")
  buff = MessagePack::Buffer.new
  buff << header.to_msgpack
  buff << body.to_msgpack unless body.nil?
  res = socket.send(buff.to_str, 0)
  Log.info("#{__method__}|Res: #{res.inspect}")
  @requests[seq] = { header: header, ack?: false }
  seq
end

#unpackerMessagePack::Unpacker

creates a MsgPack un-packer object from the tcp socket unless its already present

Returns:

  • (MessagePack::Unpacker)


69
70
71
# File 'lib/serfx/connection.rb', line 69

def unpacker
  @unpacker ||= MessagePack::Unpacker.new(socket)
end