Class: Sensu::Client::Socket
- Inherits:
-
EM::Connection
- Object
- EM::Connection
- Sensu::Client::Socket
- Defined in:
- lib/sensu/client/socket.rb
Overview
EventMachine connection handler for the Sensu client“s socket.
The Sensu client listens on localhost, port 3030 (by default), for UDP and TCP traffic. This allows software running on the host to push check results (that may contain metrics) into Sensu, without needing to know anything about Sensu“s internal implementation.
The socket only accepts 7-bit ASCII-encoded data.
Although the Sensu client accepts UDP and TCP traffic, you must be aware of the UDP protocol limitations. Any data you send over UDP must fit in a single datagram and you will not receive a response (no confirmation).
UDP Protocol ==
If the socket receives a message containing whitespace and the string “ping”, it will ignore it.
The socket assumes all other messages will contain a single, complete, JSON hash. The hash must be a valid JSON check result. Deserialization failures will be logged at the ERROR level by the Sensu client, but the sender of the invalid data will not be notified.
TCP Protocol ==
If the socket receives a message containing whitespace and the string “ping”, it will respond with the message “pong”.
The socket assumes any other stream will be a single, complete, JSON hash. A deserialization failure will be logged at the WARN level by the Sensu client and respond with the message “invalid”. An “ok” response indicates the Sensu client successfully received the JSON hash and will publish the check result.
Streams can be of any length. The socket protocol does not require any headers, instead the socket tries to parse everything it has been sent each time a chunk of data arrives. Once the JSON parses successfully, the Sensu client publishes the result. After WATCHDOG_DELAY (default is 500 msec) since the most recent chunk of data was received, the agent will give up on the sender, and instead respond “invalid” and close the connection.
Defined Under Namespace
Classes: DataError
Constant Summary collapse
- WATCHDOG_DELAY =
The number of seconds that may elapse between chunks of data from a sender before it is considered dead, and the connection is close.
0.5- MODE_ACCEPT =
ACCEPT mode. Append chunks of data to a buffer and test to see whether the buffer contents are valid JSON.
:ACCEPT- MODE_REJECT =
REJECT mode. No longer receiving data from sender. Discard chunks of data in this mode, the connection is being closed.
:REJECT- PING_REQUEST =
PING request string, identifying a connection ping request.
"ping".freeze
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#protocol ⇒ Object
Returns the value of attribute protocol.
-
#settings ⇒ Object
Returns the value of attribute settings.
-
#transport ⇒ Object
Returns the value of attribute transport.
Instance Method Summary collapse
-
#cancel_watchdog ⇒ Object
Cancel the current connection watchdog.
-
#parse_check_result(data) ⇒ Object
Parse a JSON check result.
-
#post_init ⇒ Object
Initialize instance variables that will be used throughout the lifetime of the connection.
-
#process_check_result(check) ⇒ Object
Process a check result.
-
#process_data(data) ⇒ Object
Process the data received.
-
#publish_check_result(check) ⇒ Object
Publish a check result to the Sensu transport.
-
#receive_data(data) ⇒ Object
This method is called whenever data is received.
-
#reset_watchdog ⇒ Object
Reset (or start) the connection watchdog.
-
#respond(data) ⇒ Object
Send a response to the sender, close the connection, and call post_init().
-
#valid_utf8?(data) ⇒ Boolean
Tests if the argument (data) is a valid UTF-8 sequence.
-
#validate_check_result(check) ⇒ Object
Validate check result attributes.
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
52 53 54 |
# File 'lib/sensu/client/socket.rb', line 52 def logger @logger end |
#protocol ⇒ Object
Returns the value of attribute protocol.
52 53 54 |
# File 'lib/sensu/client/socket.rb', line 52 def protocol @protocol end |
#settings ⇒ Object
Returns the value of attribute settings.
52 53 54 |
# File 'lib/sensu/client/socket.rb', line 52 def settings @settings end |
#transport ⇒ Object
Returns the value of attribute transport.
52 53 54 |
# File 'lib/sensu/client/socket.rb', line 52 def transport @transport end |
Instance Method Details
#cancel_watchdog ⇒ Object
Cancel the current connection watchdog.
99 100 101 102 103 |
# File 'lib/sensu/client/socket.rb', line 99 def cancel_watchdog if @watchdog @watchdog.cancel end end |
#parse_check_result(data) ⇒ Object
Parse a JSON check result. For UDP, immediately raise a parser error. For TCP, record parser errors, so the connection watchdog can report them.
170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/sensu/client/socket.rb', line 170 def parse_check_result(data) begin check = Sensu::JSON.load(data) cancel_watchdog process_check_result(check) rescue Sensu::JSON::ParseError, ArgumentError => error if @protocol == :tcp @parse_error = error.to_s else raise error end end end |
#post_init ⇒ Object
Initialize instance variables that will be used throughout the lifetime of the connection. This method is called when the network connection has been established, and immediately after responding to a sender.
78 79 80 81 82 83 84 |
# File 'lib/sensu/client/socket.rb', line 78 def post_init @protocol ||= :tcp @data_buffer = "" @parse_error = nil @watchdog = nil @mode = MODE_ACCEPT end |
#process_check_result(check) ⇒ Object
Process a check result. Set check result attribute defaults, validate the attributes, publish the check result to the Sensu transport, and respond to the sender with the message “ok”.
157 158 159 160 161 162 163 |
# File 'lib/sensu/client/socket.rb', line 157 def process_check_result(check) check[:status] ||= 0 check[:executed] ||= Time.now.to_i validate_check_result(check) publish_check_result(check) respond("ok") end |
#process_data(data) ⇒ Object
Process the data received. This method validates the data encoding, provides ping/pong functionality, and passes potential check results on for further processing.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/sensu/client/socket.rb', line 189 def process_data(data) if data.strip == PING_REQUEST @logger.debug("socket received ping") respond("pong") else @logger.debug("socket received data", :data => data) unless valid_utf8?(data) @logger.warn("data from socket is not a valid UTF-8 sequence, processing it anyways", :data => data) end begin parse_check_result(data) rescue => error @logger.error("failed to process check result from socket", { :data => data, :error => error.to_s }) respond("invalid") end end end |
#publish_check_result(check) ⇒ Object
Publish a check result to the Sensu transport.
142 143 144 145 146 147 148 149 |
# File 'lib/sensu/client/socket.rb', line 142 def publish_check_result(check) payload = { :client => @settings[:client][:name], :check => check.merge(:issued => Time.now.to_i) } @logger.info("publishing check result", :payload => payload) @transport.publish(:direct, "results", Sensu::JSON.dump(payload)) end |
#receive_data(data) ⇒ Object
This method is called whenever data is received. For UDP, it will only be called once, the original data length can be expected. For TCP, this method may be called several times, data received is buffered. TCP connections require a watchdog.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/sensu/client/socket.rb', line 230 def receive_data(data) unless @mode == MODE_REJECT case @protocol when :udp process_data(data) when :tcp if EM.reactor_running? reset_watchdog end @data_buffer << data process_data(@data_buffer) end end end |
#reset_watchdog ⇒ Object
Reset (or start) the connection watchdog.
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/sensu/client/socket.rb', line 106 def reset_watchdog cancel_watchdog @watchdog = EM::Timer.new(WATCHDOG_DELAY) do @mode = MODE_REJECT @logger.warn("discarding data buffer for sender and closing connection", { :data => @data_buffer, :parse_error => @parse_error }) respond("invalid") end end |
#respond(data) ⇒ Object
Send a response to the sender, close the connection, and call post_init().
90 91 92 93 94 95 96 |
# File 'lib/sensu/client/socket.rb', line 90 def respond(data) if @protocol == :tcp send_data(data) close_connection_after_writing end post_init end |
#valid_utf8?(data) ⇒ Boolean
Tests if the argument (data) is a valid UTF-8 sequence.
213 214 215 216 217 218 219 220 221 222 |
# File 'lib/sensu/client/socket.rb', line 213 def valid_utf8?(data) utf8_string_pattern = /\A([\x00-\x7f]| [\xc2-\xdf][\x80-\xbf]| \xe0[\xa0-\xbf][\x80-\xbf]| [\xe1-\xef][\x80-\xbf]{2}| \xf0[\x90-\xbf][\x80-\xbf]{2}| [\xf1-\xf7][\x80-\xbf]{3})*\z/nx data = data.force_encoding('BINARY') if data.respond_to?(:force_encoding) return data =~ utf8_string_pattern end |
#validate_check_result(check) ⇒ Object
Validate check result attributes.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/sensu/client/socket.rb', line 121 def validate_check_result(check) unless check[:name] =~ /\A[\w\.-]+\z/ raise DataError, "check name must be a string and cannot contain spaces or special characters" end unless check[:source].nil? || check[:source] =~ /\A[\w\.-]+\z/ raise DataError, "check source must be a string and cannot contain spaces or special characters" end unless check[:output].is_a?(String) raise DataError, "check output must be a string" end unless check[:status].is_a?(Integer) raise DataError, "check status must be an integer" end unless check[:executed].is_a?(Integer) raise DataError, "check executed timestamp must be an integer" end end |