Class: Serfx::Connection

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Commands
Defined in:
lib/serfx/connection.rb

Overview

This class wraps the low level msgpack data transformation and tcp communication for the RPC session. methods in this module are used to implement the actual RPC commands available via [Commands]

Constant Summary collapse

COMMANDS =
{
  handshake:        [:header],
  auth:             [:header],
  event:            [:header],
  force_leave:      [:header],
  join:             [:header, :body],
  members:          [:header, :body],
  members_filtered: [:header, :body],
  tags:             [:header],
  stream:           [:header],
  monitor:          [:header],
  stop:             [:header],
  leave:            [:header],
  query:            [:header],
  respond:          [:header],
  install_key:      [:header, :body],
  use_key:          [:header, :body],
  remove_key:       [:header, :body],
  list_keys:        [:header, :body],
  stats:            [:header, :body],
  get_coordinate:   [:header, :body]
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Commands

#auth, #event, #force_leave, #get_coordinate, #handshake, #install_key, #join, #leave, #list_keys, #members, #members_filtered, #monitor, #query, #remove_key, #respond, #stats, #stop, #stream, #tags, #use_key

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • opts (Hash) (defaults to: {})

    Specify the RPC connection details

Options Hash (opts):

  • :host (Symbol)

    ipaddreess of the target serf agent

  • :port (Symbol)

    port of target serf agents RPC

  • :authkey (Symbol)

    encryption key for RPC communication



50
51
52
53
54
55
56
57
# File 'lib/serfx/connection.rb', line 50

def initialize(opts = {})
  @host = opts[:host] || '127.0.0.1'
  @port = opts[:port] || 7373
  @seq = 0
  @authkey = opts[:authkey]
  @requests = {}
  @responses = {}
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



43
44
45
# File 'lib/serfx/connection.rb', line 43

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



43
44
45
# File 'lib/serfx/connection.rb', line 43

def port
  @port
end

#seqObject (readonly)

Returns the value of attribute seq.



43
44
45
# File 'lib/serfx/connection.rb', line 43

def seq
  @seq
end

Instance Method Details

#check_rpc_error!(header) ⇒ Object

checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty

Parameters:

  • header (Hash)

    RPC response header as hash



109
110
111
# File 'lib/serfx/connection.rb', line 109

def check_rpc_error!(header)
  fail RPCError, header['Error'] unless header['Error'].nil? || header['Error'].empty?
end

#read_dataHash

read data from tcp socket and pipe it through msgpack unpacker for deserialization

Returns:

  • (Hash)


77
78
79
# File 'lib/serfx/connection.rb', line 77

def read_data
  unpacker.read
end

#read_response(command) ⇒ Response

read data from the tcp socket. and convert it to a [Response] object

Parameters:

  • command (String)

    RPC command name for which response will be read

Returns:



117
118
119
120
121
122
123
124
125
126
# File 'lib/serfx/connection.rb', line 117

def read_response(command)
  header =  read_data
  check_rpc_error!(header)
  if COMMANDS[command].include?(:body)
    body = read_data
    Response.new(header, body)
  else
    Response.new(header)
  end
end

#request(command, body = nil) ⇒ Response

make an RPC request against the serf agent

Parameters:

  • command (String)

    name of the RPC command

  • body (Hash) (defaults to: nil)

    an optional request body for the RPC command

Returns:



133
134
135
136
# File 'lib/serfx/connection.rb', line 133

def request(command, body = nil)
  tcp_send(command, body)
  read_response(command)
end

#socketTCPSocket

creates a tcp socket if does not exist already, against RPC host/port

Returns:

  • (TCPSocket)


62
63
64
# File 'lib/serfx/connection.rb', line 62

def socket
  @socket ||= TCPSocket.new(host, port)
end

#tcp_send(command, body = nil) ⇒ Integer

takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp

Parameters:

  • command (String)

    RPC command name

  • body (Hash) (defaults to: nil)

    request body of the RPC command

Returns:

  • (Integer)


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/serfx/connection.rb', line 89

def tcp_send(command, body = nil)
  @seq += 1
  header = {
    'Command' => command.to_s.gsub('_', '-'),
    'Seq' => seq
  }
  Log.info("#{__method__}|Header: #{header.inspect}")
  buff = MessagePack::Buffer.new
  buff << header.to_msgpack
  buff << body.to_msgpack unless body.nil?
  res = socket.send(buff.to_str, 0)
  Log.info("#{__method__}|Res: #{res.inspect}")
  @requests[seq] = { header: header, ack?: false }
  seq
end

#unpackerMessagePack::Unpacker

creates a MsgPack un-packer object from the tcp socket unless its already present

Returns:

  • (MessagePack::Unpacker)


70
71
72
# File 'lib/serfx/connection.rb', line 70

def unpacker
  @unpacker ||= MessagePack::Unpacker.new(socket)
end