Class: Riemann::Client::TCP

Inherits:
Riemann::Client show all
Defined in:
lib/riemann/client/tcp.rb

Constant Summary

Constants inherited from Riemann::Client

HOST, PORT, TIMEOUT

Class Attribute Summary collapse

Instance Attribute Summary collapse

Attributes inherited from Riemann::Client

#tcp, #udp

Instance Method Summary collapse

Methods inherited from Riemann::Client

#<<, #[], #bulk_send, #connect, #normalize_events, #query, #timeout

Constructor Details

#initialize(options = {}) ⇒ TCP

rubocop:disable Lint/MissingSuper



29
30
31
32
33
34
# File 'lib/riemann/client/tcp.rb', line 29

def initialize(options = {}) # rubocop:disable Lint/MissingSuper
  @options = options
  @locket  = Monitor.new
  @socket  = nil
  @pid     = nil
end

Class Attribute Details

.socket_factoryObject

Public: Return a socket factory



19
20
21
22
23
24
25
26
27
# File 'lib/riemann/client/tcp.rb', line 19

def self.socket_factory
  @socket_factory ||= proc { |options|
    if options[:ssl]
      SSLSocket.connect(options)
    else
      TcpSocket.connect(options)
    end
  }
end

Instance Attribute Details

#hostObject

Returns the value of attribute host.



10
11
12
# File 'lib/riemann/client/tcp.rb', line 10

def host
  @host
end

#portObject

Returns the value of attribute port.



10
11
12
# File 'lib/riemann/client/tcp.rb', line 10

def port
  @port
end

Instance Method Details

#closeObject



49
50
51
52
53
54
# File 'lib/riemann/client/tcp.rb', line 49

def close
  @locket.synchronize do
    @socket.close if connected?
    @socket = nil
  end
end

#connected?Boolean

Returns:

  • (Boolean)


56
57
58
59
60
# File 'lib/riemann/client/tcp.rb', line 56

def connected?
  @locket.synchronize do
    !@socket.nil? && !@socket.closed?
  end
end

#read_message(socket) ⇒ Object

Read a message from a stream



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/riemann/client/tcp.rb', line 63

def read_message(socket)
  unless (buffer = socket.read(4)) && (buffer.size == 4)
    raise InvalidResponse, 'unexpected EOF'
  end

  length = buffer.unpack1('N')
  begin
    str = socket.read length
    message = Riemann::Message.decode str
  rescue StandardError
    puts "Message was #{str.inspect}"
    raise
  end

  unless message.ok
    puts 'Failed'
    raise ServerError, message.error
  end

  message
end

#send_recv(message) ⇒ Object Also known as: send_maybe_recv



85
86
87
88
89
90
# File 'lib/riemann/client/tcp.rb', line 85

def send_recv(message)
  with_connection do |socket|
    socket.write(message.encode_with_length)
    read_message(socket)
  end
end

#socketObject



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/riemann/client/tcp.rb', line 36

def socket
  @locket.synchronize do
    close if @pid && @pid != Process.pid

    return @socket if connected?

    @socket = self.class.socket_factory.call(@options)
    @pid    = Process.pid

    return @socket
  end
end

#with_connectionObject

Yields a connection in the block.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/riemann/client/tcp.rb', line 95

def with_connection
  tries = 0

  @locket.synchronize do
    tries += 1
    yield(socket)
  rescue IOError, Errno::EPIPE, Errno::ECONNREFUSED, InvalidResponse, Timeout::Error,
         Riemann::Client::TcpSocket::Error
    close
    raise if tries > 3

    retry
  rescue StandardError
    close
    raise
  end
end