Class: Sensu::Client::Socket

Inherits:
EM::Connection
  • Object
show all
Includes:
CheckUtils
Defined in:
lib/sensu/client/socket.rb

Overview

EventMachine connection handler for the Sensu client“s socket.

The Sensu client listens on localhost, port 3030 (by default), for UDP and TCP traffic. This allows software running on the host to push check results (that may contain metrics) into Sensu, without needing to know anything about Sensu“s internal implementation.

The socket only accepts 7-bit ASCII-encoded data.

Although the Sensu client accepts UDP and TCP traffic, you must be aware of the UDP protocol limitations. Any data you send over UDP must fit in a single datagram and you will not receive a response (no confirmation).

UDP Protocol ==

If the socket receives a message containing whitespace and the string “ping”, it will ignore it.

The socket assumes all other messages will contain a single, complete, JSON hash. The hash must be a valid JSON check result. Deserialization failures will be logged at the ERROR level by the Sensu client, but the sender of the invalid data will not be notified.

TCP Protocol ==

If the socket receives a message containing whitespace and the string “ping”, it will respond with the message “pong”.

The socket assumes any other stream will be a single, complete, JSON hash. A deserialization failure will be logged at the WARN level by the Sensu client and respond with the message “invalid”. An “ok” response indicates the Sensu client successfully received the JSON hash and will publish the check result.

Streams can be of any length. The socket protocol does not require any headers, instead the socket tries to parse everything it has been sent each time a chunk of data arrives. Once the JSON parses successfully, the Sensu client publishes the result. After WATCHDOG_DELAY (default is 500 msec) since the most recent chunk of data was received, the agent will give up on the sender, and instead respond “invalid” and close the connection.

Constant Summary collapse

WATCHDOG_DELAY =

The number of seconds that may elapse between chunks of data from a sender before it is considered dead, and the connection is close.

0.5
MODE_ACCEPT =

ACCEPT mode. Append chunks of data to a buffer and test to see whether the buffer contents are valid JSON.

:ACCEPT
MODE_REJECT =

REJECT mode. No longer receiving data from sender. Discard chunks of data in this mode, the connection is being closed.

:REJECT
PING_REQUEST =

PING request string, identifying a connection ping request.

"ping".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from CheckUtils

#process_check_result, #publish_check_result, #validate_check_result

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



53
54
55
# File 'lib/sensu/client/socket.rb', line 53

def logger
  @logger
end

#protocolObject

Returns the value of attribute protocol.



53
54
55
# File 'lib/sensu/client/socket.rb', line 53

def protocol
  @protocol
end

#settingsObject

Returns the value of attribute settings.



53
54
55
# File 'lib/sensu/client/socket.rb', line 53

def settings
  @settings
end

#transportObject

Returns the value of attribute transport.



53
54
55
# File 'lib/sensu/client/socket.rb', line 53

def transport
  @transport
end

Instance Method Details

#cancel_watchdogObject

Cancel the current connection watchdog.



100
101
102
103
104
# File 'lib/sensu/client/socket.rb', line 100

def cancel_watchdog
  if @watchdog
    @watchdog.cancel
  end
end

#parse_check_result(data) ⇒ Object

Parse one or more JSON check results. For UDP, immediately raise a parser error. For TCP, record parser errors, so the connection watchdog can report them.

Parameters:

  • data (String)

    to parse for a check result.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/sensu/client/socket.rb', line 124

def parse_check_result(data)
  begin
    object = Sensu::JSON.load(data)
    cancel_watchdog
    if object.is_a?(Array)
      object.each do |check|
        process_check_result(check)
      end
    else
      process_check_result(object)
    end
    respond("ok")
  rescue Sensu::JSON::ParseError, ArgumentError => error
    if @protocol == :tcp
      @parse_error = error.to_s
    else
      raise error
    end
  end
end

#post_initObject

Initialize instance variables that will be used throughout the lifetime of the connection. This method is called when the network connection has been established, and immediately after responding to a sender.



79
80
81
82
83
84
85
# File 'lib/sensu/client/socket.rb', line 79

def post_init
  @protocol ||= :tcp
  @data_buffer = ""
  @parse_error = nil
  @watchdog = nil
  @mode = MODE_ACCEPT
end

#process_data(data) ⇒ Object

Process the data received. This method validates the data encoding, provides ping/pong functionality, and passes potential check results on for further processing.

Parameters:

  • data (String)

    to be processed.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/sensu/client/socket.rb', line 150

def process_data(data)
  if data.strip == PING_REQUEST
    @logger.debug("socket received ping")
    respond("pong")
  else
    @logger.debug("socket received data", :data => data)
    unless valid_utf8?(data)
      @logger.warn("data from socket is not a valid UTF-8 sequence, processing it anyways", :data => data)
    end
    begin
      parse_check_result(data)
    rescue => error
      @logger.error("failed to process check result from socket", {
        :data => data,
        :error => error.to_s
      })
      respond("invalid")
    end
  end
end

#receive_data(data) ⇒ Object

This method is called whenever data is received. For UDP, it will only be called once, the original data length can be expected. For TCP, this method may be called several times, data received is buffered. TCP connections require a watchdog.

Parameters:

  • data (String)

    received from the sender.



191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/sensu/client/socket.rb', line 191

def receive_data(data)
  unless @mode == MODE_REJECT
    case @protocol
    when :udp
      process_data(data)
    when :tcp
      if EM::reactor_running?
        reset_watchdog
      end
      @data_buffer << data
      process_data(@data_buffer)
    end
  end
end

#reset_watchdogObject

Reset (or start) the connection watchdog.



107
108
109
110
111
112
113
114
115
116
117
# File 'lib/sensu/client/socket.rb', line 107

def reset_watchdog
  cancel_watchdog
  @watchdog = EM::Timer.new(WATCHDOG_DELAY) do
    @mode = MODE_REJECT
    @logger.warn("discarding data buffer for sender and closing connection", {
      :data => @data_buffer,
      :parse_error => @parse_error
    })
    respond("invalid")
  end
end

#respond(data) ⇒ Object

Send a response to the sender, close the connection, and call post_init().

Parameters:

  • data (String)

    to send as a response.



91
92
93
94
95
96
97
# File 'lib/sensu/client/socket.rb', line 91

def respond(data)
  if @protocol == :tcp
    send_data(data)
    close_connection_after_writing
  end
  post_init
end

#valid_utf8?(data) ⇒ Boolean

Tests if the argument (data) is a valid UTF-8 sequence.

Parameters:

  • data (String)

    to be tested.

Returns:

  • (Boolean)


174
175
176
177
178
179
180
181
182
183
# File 'lib/sensu/client/socket.rb', line 174

def valid_utf8?(data)
  utf8_string_pattern = /\A([\x00-\x7f]|
                            [\xc2-\xdf][\x80-\xbf]|
                            \xe0[\xa0-\xbf][\x80-\xbf]|
                            [\xe1-\xef][\x80-\xbf]{2}|
                            \xf0[\x90-\xbf][\x80-\xbf]{2}|
                            [\xf1-\xf7][\x80-\xbf]{3})*\z/nx
  data = data.force_encoding('BINARY') if data.respond_to?(:force_encoding)
  return data =~ utf8_string_pattern
end