Class: Sensu::Socket

Inherits:
EM::Connection
  • Object
show all
Defined in:
lib/sensu/socket.rb

Overview

EventMachine connection handler for the Sensu client’s socket.

The Sensu client listens on localhost, port 3030 (by default), for UDP and TCP traffic. This allows software running on the host to push check results (that may contain metrics) into Sensu, without needing to know anything about Sensu’s internal implementation.

The socket only accepts 7-bit ASCII-encoded data.

Although the Sensu client accepts UDP and TCP traffic, you must be aware of the UDP protocol limitations. Any data you send over UDP must fit in a single datagram and you will not receive a response (no confirmation).

UDP Protocol ==

If the socket receives a message containing whitespace and the string ‘ping’, it will ignore it.

The socket assumes all other messages will contain a single, complete, JSON hash. The hash must be a valid JSON check result. Deserialization failures will be logged at the ERROR level by the Sensu client, but the sender of the invalid data will not be notified.

TCP Protocol ==

If the socket receives a message containing whitespace and the string ‘ping’, it will respond with the message ‘pong’.

The socket assumes any other stream will be a single, complete, JSON hash. A deserialization failure will be logged at the WARN level by the Sensu client and respond with the message ‘invalid’. An ‘ok’ response indicates the Sensu client successfully received the JSON hash and will publish the check result.

Streams can be of any length. The socket protocol does not require any headers, instead the socket tries to parse everything it has been sent each time a chunk of data arrives. Once the JSON parses successfully, the Sensu client publishes the result. After WATCHDOG_DELAY (default is 500 msec) since the most recent chunk of data was received, the agent will give up on the sender, and instead respond ‘invalid’ and close the connection.

Defined Under Namespace

Classes: DataError

Constant Summary collapse

WATCHDOG_DELAY =

The number of seconds that may elapse between chunks of data from a sender before it is considered dead, and the connection is close.

0.5
MODE_ACCEPT =

ACCEPT mode. Append chunks of data to a buffer and test to see whether the buffer contents are valid JSON.

:ACCEPT
MODE_REJECT =

REJECT mode. No longer receiving data from sender. Discard chunks of data in this mode, the connection is being closed.

:REJECT

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



51
52
53
# File 'lib/sensu/socket.rb', line 51

def logger
  @logger
end

#protocolObject

Returns the value of attribute protocol.



51
52
53
# File 'lib/sensu/socket.rb', line 51

def protocol
  @protocol
end

#settingsObject

Returns the value of attribute settings.



51
52
53
# File 'lib/sensu/socket.rb', line 51

def settings
  @settings
end

#transportObject

Returns the value of attribute transport.



51
52
53
# File 'lib/sensu/socket.rb', line 51

def transport
  @transport
end

Instance Method Details

#cancel_watchdogObject

Cancel the current connection watchdog.



95
96
97
98
99
# File 'lib/sensu/socket.rb', line 95

def cancel_watchdog
  if @watchdog
    @watchdog.cancel
  end
end

#parse_check_result(data) ⇒ Object

Parse a JSON check result. For UDP, immediately raise a parser error. For TCP, record parser errors, so the connection watchdog can report them.

Parameters:

  • data (String)

    to parse for a check result.



161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/sensu/socket.rb', line 161

def parse_check_result(data)
  begin
    check = MultiJson.load(data)
    cancel_watchdog
    process_check_result(check)
  rescue MultiJson::ParseError, ArgumentError => error
    if @protocol == :tcp
      @parse_error = error.to_s
    else
      raise error
    end
  end
end

#post_initObject

Initialize instance variables that will be used throughout the lifetime of the connection. This method is called when the network connection has been established, and immediately after responding to a sender.



74
75
76
77
78
79
80
# File 'lib/sensu/socket.rb', line 74

def post_init
  @protocol ||= :tcp
  @data_buffer = ''
  @parse_error = nil
  @watchdog = nil
  @mode = MODE_ACCEPT
end

#process_check_result(check) ⇒ Object

Process a check result. Set check result attribute defaults, validate the attributes, publish the check result to the Sensu transport, and respond to the sender with the message ‘ok’.

Parameters:

  • check (Hash)

    result to be validated and published.

Raises:



149
150
151
152
153
154
# File 'lib/sensu/socket.rb', line 149

def process_check_result(check)
  check[:status] ||= 0
  validate_check_result(check)
  publish_check_result(check)
  respond('ok')
end

#process_data(data) ⇒ Object

Process the data received. This method validates the data encoding, provides ping/pong functionality, and passes potential check results on for further processing.

Parameters:

  • data (String)

    to be processed.



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/sensu/socket.rb', line 180

def process_data(data)
  if data.bytes.find { |char| char > 0x80 }
    @logger.warn('socket received non-ascii characters')
    respond('invalid')
  elsif data.strip == 'ping'
    @logger.debug('socket received ping')
    respond('pong')
  else
    @logger.debug('socket received data', {
      :data => data
    })
    begin
      parse_check_result(data)
    rescue => error
      @logger.error('failed to process check result from socket', {
        :data => data,
        :error => error.to_s
      })
      respond('invalid')
    end
  end
end

#publish_check_result(check) ⇒ Object

Publish a check result to the Sensu transport.

Parameters:

  • check (Hash)

    result.



132
133
134
135
136
137
138
139
140
141
# File 'lib/sensu/socket.rb', line 132

def publish_check_result(check)
  payload = {
    :client => @settings[:client][:name],
    :check => check.merge(:issued => Time.now.to_i)
  }
  @logger.info('publishing check result', {
    :payload => payload
  })
  @transport.publish(:direct, 'results', MultiJson.dump(payload))
end

#receive_data(data) ⇒ Object

This method is called whenever data is received. For UDP, it will only be called once, the original data length can be expected. For TCP, this method may be called several times, data received is buffered. TCP connections require a watchdog.

Parameters:

  • data (String)

    received from the sender.



209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/sensu/socket.rb', line 209

def receive_data(data)
  unless @mode == MODE_REJECT
    case @protocol
    when :udp
      process_data(data)
    when :tcp
      if EM.reactor_running?
        reset_watchdog
      end
      @data_buffer << data
      process_data(@data_buffer)
    end
  end
end

#reset_watchdogObject

Reset (or start) the connection watchdog.



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/sensu/socket.rb', line 102

def reset_watchdog
  cancel_watchdog
  @watchdog = EM::Timer.new(WATCHDOG_DELAY) do
    @mode = MODE_REJECT
    @logger.warn('discarding data buffer for sender and closing connection', {
      :data => @data_buffer,
      :parse_error => @parse_error
    })
    respond('invalid')
  end
end

#respond(data) ⇒ Object

Send a response to the sender, close the connection, and call post_init().

Parameters:

  • data (String)

    to send as a response.



86
87
88
89
90
91
92
# File 'lib/sensu/socket.rb', line 86

def respond(data)
  if @protocol == :tcp
    send_data(data)
    close_connection_after_writing
  end
  post_init
end

#validate_check_result(check) ⇒ Object

Validate check result attributes.

Parameters:

  • check (Hash)

    result to validate.



117
118
119
120
121
122
123
124
125
126
127
# File 'lib/sensu/socket.rb', line 117

def validate_check_result(check)
  unless check[:name] =~ /^[\w\.-]+$/
    raise DataError, 'check name must be a string and cannot contain spaces or special characters'
  end
  unless check[:output].is_a?(String)
    raise DataError, 'check output must be a string'
  end
  unless check[:status].is_a?(Integer)
    raise DataError, 'check status must be an integer'
  end
end