Class: Sensu::Client::Process

Inherits:
Object
  • Object
show all
Includes:
Daemon
Defined in:
lib/sensu/client/process.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Daemon

#load_extensions, #load_settings, #log_concerns, #setup_logger, #setup_process, #setup_redis, #setup_signal_traps, #setup_transport

Methods included from Utilities

#deep_merge, #random_uuid, #redact_sensitive, #retry_until_true, #testing?

Constructor Details

#initialize(options = {}) ⇒ Process

Override Daemon initialize() to support Sensu client check execution safe mode and checks in progress.

Parameters:

  • options (Hash) (defaults to: {})


28
29
30
31
32
# File 'lib/sensu/client/process.rb', line 28

def initialize(options={})
  super
  @safe_mode = @settings[:client][:safe_mode] || false
  @checks_in_progress = []
end

Instance Attribute Details

#safe_modeObject

Returns the value of attribute safe_mode.



9
10
11
# File 'lib/sensu/client/process.rb', line 9

def safe_mode
  @safe_mode
end

Class Method Details

.run(options = {}) ⇒ Object

Create an instance of the Sensu client process, start the client within the EventMachine event loop, and set up client process signal traps (for stopping).

Parameters:

  • options (Hash) (defaults to: {})


16
17
18
19
20
21
22
# File 'lib/sensu/client/process.rb', line 16

def self.run(options={})
  client = self.new(options)
  EM::run do
    client.start
    client.setup_signal_traps
  end
end

Instance Method Details

#bootstrapObject

Bootstrap the Sensu client, setting up client keepalives, subscriptions, and standalone check executions. This method sets the process/daemon ‘@state` to `:running`.



358
359
360
361
362
363
# File 'lib/sensu/client/process.rb', line 358

def bootstrap
  setup_keepalives
  setup_subscriptions
  setup_standalone
  @state = :running
end

#calculate_execution_splay(check) ⇒ Object

Calculate a check execution splay, taking into account the current time and the execution interval to ensure it’s consistent between process restarts.

Parameters:

  • check (Hash)

    definition.



268
269
270
271
272
273
# File 'lib/sensu/client/process.rb', line 268

def calculate_execution_splay(check)
  key = [@settings[:client][:name], check[:name]].join(":")
  splay_hash = Digest::MD5.digest(key).unpack("Q<").first
  current_time = (Time.now.to_f * 1000).to_i
  (splay_hash - current_time) % (check[:interval] * 1000) / 1000.0
end

#complete_checks_in_progress(&callback) ⇒ Object

Call a callback (Ruby block) when there are no longer check executions in progress. This method is used when stopping the Sensu client. The ‘retry_until_true` helper method is used to check the condition every 0.5 seconds until `true` is returned.

Parameters:

  • callback (Proc)

    called when there are no check executions in progress.



345
346
347
348
349
350
351
352
353
# File 'lib/sensu/client/process.rb', line 345

def complete_checks_in_progress(&callback)
  @logger.info("completing checks in progress", :checks_in_progress => @checks_in_progress)
  retry_until_true do
    if @checks_in_progress.empty?
      callback.call
      true
    end
  end
end

#execute_check_command(check) ⇒ Object

Execute a check command, capturing its output (STDOUT/ERR), exit status code, execution duration, timestamp, and publish the result. This method guards against multiple executions for the same check. Check command tokens are substituted with the associated client attribute values. If there are unmatched check command tokens, the check command will not be executed, instead a check result will be published reporting the unmatched tokens.

Parameters:

  • check (Hash)


144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/sensu/client/process.rb', line 144

def execute_check_command(check)
  @logger.debug("attempting to execute check command", :check => check)
  unless @checks_in_progress.include?(check[:name])
    @checks_in_progress << check[:name]
    command, unmatched_tokens = substitute_check_command_tokens(check)
    if unmatched_tokens.empty?
      check[:executed] = Time.now.to_i
      started = Time.now.to_f
      Spawn.process(command, :timeout => check[:timeout]) do |output, status|
        check[:duration] = ("%.3f" % (Time.now.to_f - started)).to_f
        check[:output] = output
        check[:status] = status
        publish_check_result(check)
        @checks_in_progress.delete(check[:name])
      end
    else
      check[:output] = "Unmatched command tokens: " + unmatched_tokens.join(", ")
      check[:status] = 3
      check[:handle] = false
      publish_check_result(check)
      @checks_in_progress.delete(check[:name])
    end
  else
    @logger.warn("previous check command execution in progress", :check => check)
  end
end

#find_client_attribute(tree, path, default) ⇒ Object

Traverse the Sensu client definition (hash) for an attribute value, with a fallback default value if nil.

Parameters:

  • tree (Hash)

    to traverse.

  • path (Array)

    of attribute keys.

  • default (Object)

    value if attribute value is nil.

Returns:

  • (Object)

    attribute or fallback default value.



105
106
107
108
109
110
111
112
# File 'lib/sensu/client/process.rb', line 105

def find_client_attribute(tree, path, default)
  attribute = tree[path.shift]
  if attribute.is_a?(Hash)
    find_client_attribute(attribute, path, default)
  else
    attribute.nil? ? default : attribute
  end
end

#keepalive_payloadHash

Create a Sensu client keepalive payload, to be sent over the transport for processing. A client keepalive is composed of its settings definition, the Sensu version, and a timestamp. Sensitive information is redacted from the keepalive payload.

Returns:

  • (Hash)

    keepalive payload



40
41
42
43
44
45
46
# File 'lib/sensu/client/process.rb', line 40

def keepalive_payload
  payload = @settings[:client].merge({
    :version => VERSION,
    :timestamp => Time.now.to_i
  })
  redact_sensitive(payload, @settings[:client][:redact])
end

#pauseObject

Pause the Sensu client process, unless it is being paused or has already been paused. The process/daemon ‘@state` is first set to `:pausing`, to indicate that it’s in progress. All run timers are cancelled, and the references are cleared. The Sensu client will unsubscribe from all transport subscriptions, then set the process/daemon ‘@state` to `:paused`.



381
382
383
384
385
386
387
388
389
390
391
# File 'lib/sensu/client/process.rb', line 381

def pause
  unless @state == :pausing || @state == :paused
    @state = :pausing
    @timers[:run].each do |timer|
      timer.cancel
    end
    @timers[:run].clear
    @transport.unsubscribe
    @state = :paused
  end
end

#process_check_request(check) ⇒ Object

Process a check request. If a check request has a check command, it will be executed. A standard check request will be merged with a local check definition, if present. Client safe mode is enforced in this method, requiring a local check definition in order to execute the check command. If a local check definition does not exist when operating with client safe mode, a check result will be published to report the missing check definition. A check request without a command indicates a check extension run. The check request may contain ‘:extension`, the name of the extension to run. If `:extension` is not present, the check name is used for the extension name. If a check extension does not exist for a name, a check result will be published to report the unknown check extension.

Parameters:

  • check (Hash)


210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/sensu/client/process.rb', line 210

def process_check_request(check)
  @logger.debug("processing check", :check => check)
  if @settings.check_exists?(check[:name])
    check.merge!(@settings[:checks][check[:name]])
  end
  if check.has_key?(:command)
    if @safe_mode && !@settings.check_exists?(check[:name])
      check[:output] = "Check is not locally defined (safe mode)"
      check[:status] = 3
      check[:handle] = false
      check[:executed] = Time.now.to_i
      publish_check_result(check)
    else
      execute_check_command(check)
    end
  else
    extension_name = check[:extension] || check[:name]
    if @extensions.check_exists?(extension_name)
      run_check_extension(check)
    else
      @logger.warn("unknown check extension", :check => check)
    end
  end
end

#publish_check_result(check) ⇒ Object

Publish a check result to the transport for processing. A check result is composed of a client (name) and a check definition, containing check ‘:output` and `:status`. JSON serialization is used when publishing the check result payload to the transport pipe. Transport errors are logged.

Parameters:

  • check (Hash)


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/sensu/client/process.rb', line 82

def publish_check_result(check)
  payload = {
    :client => @settings[:client][:name],
    :check => check
  }
  @logger.info("publishing check result", :payload => payload)
  @transport.publish(:direct, "results", MultiJson.dump(payload)) do |info|
    if info[:error]
      @logger.error("failed to publish check result", {
        :payload => payload,
        :error => info[:error].to_s
      })
    end
  end
end

#publish_keepaliveObject

Publish a Sensu client keepalive to the transport for processing. JSON serialization is used for transport messages.



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/sensu/client/process.rb', line 50

def publish_keepalive
  payload = keepalive_payload
  @logger.debug("publishing keepalive", :payload => payload)
  @transport.publish(:direct, "keepalives", MultiJson.dump(payload)) do |info|
    if info[:error]
      @logger.error("failed to publish keepalive", {
        :payload => payload,
        :error => info[:error].to_s
      })
    end
  end
end

#resumeObject

Resume the Sensu client process if it is currently or will soon be paused. The ‘retry_until_true` helper method is used to determine if the process is paused and if the transport is connected. If the conditions are met, `bootstrap()` will be called and true is returned to stop `retry_until_true`.



398
399
400
401
402
403
404
405
406
407
# File 'lib/sensu/client/process.rb', line 398

def resume
  retry_until_true(1) do
    if @state == :paused
      if @transport.connected?
        bootstrap
        true
      end
    end
  end
end

#run_check_extension(check) ⇒ Object

Run a check extension and publish the result. The Sensu client loads check extensions, checks that run within the Sensu Ruby VM and the EventMachine event loop, using the Sensu Extension API. If a check definition includes ‘:extension`, use it’s value for the extension name, otherwise use the check name. The check definition is passed to the extension ‘safe_run()` method as a parameter, the extension may utilize it.

github.com/sensu/sensu-extension

Parameters:

  • check (Hash)


182
183
184
185
186
187
188
189
190
191
192
# File 'lib/sensu/client/process.rb', line 182

def run_check_extension(check)
  @logger.debug("attempting to run check extension", :check => check)
  check[:executed] = Time.now.to_i
  extension_name = check[:extension] || check[:name]
  extension = @extensions[:checks][extension_name]
  extension.safe_run(check) do |output, status|
    check[:output] = output
    check[:status] = status
    publish_check_result(check)
  end
end

#schedule_checks(checks) ⇒ Object

Schedule check executions, using EventMachine periodic timers, using a calculated execution splay. The timers are stored in the timers hash under ‘:run`, so they can be cancelled etc. Check definitions are duplicated before processing them, in case they are mutated. The check `:issued` timestamp is set here, to mimic check requests issued by a Sensu server.

Parameters:

  • checks (Array)

    of definitions.



283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/sensu/client/process.rb', line 283

def schedule_checks(checks)
  checks.each do |check|
    execute_check = Proc.new do
      check[:issued] = Time.now.to_i
      process_check_request(check.dup)
    end
    execution_splay = testing? ? 0 : calculate_execution_splay(check)
    interval = testing? ? 0.5 : check[:interval]
    @timers[:run] << EM::Timer.new(execution_splay) do
      execute_check.call
      @timers[:run] << EM::PeriodicTimer.new(interval, &execute_check)
    end
  end
end

#setup_keepalivesObject

Schedule Sensu client keepalives. Immediately publish a keepalive to register the client, then publish a keepalive every 20 seconds. Sensu client keepalives are used to determine client (& machine) health.



67
68
69
70
71
72
73
# File 'lib/sensu/client/process.rb', line 67

def setup_keepalives
  @logger.debug("scheduling keepalives")
  publish_keepalive
  @timers[:run] << EM::PeriodicTimer.new(20) do
    publish_keepalive
  end
end

#setup_socketsObject

Setup the Sensu client socket, for external check result input. By default, the client socket is bound to localhost on TCP & UDP port 3030. The socket can be configured via the client definition, ‘:socket` with `:bind` and `:port`. The current instance of the Sensu logger, settings, and transport are passed to the socket handler, `Sensu::Client::Socket`.



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/sensu/client/process.rb', line 319

def setup_sockets
  options = @settings[:client][:socket] || Hash.new
  options[:bind] ||= "127.0.0.1"
  options[:port] ||= 3030
  @logger.debug("binding client tcp and udp sockets", :options => options)
  EM::start_server(options[:bind], options[:port], Socket) do |socket|
    socket.logger = @logger
    socket.settings = @settings
    socket.transport = @transport
  end
  EM::open_datagram_socket(options[:bind], options[:port], Socket) do |socket|
    socket.logger = @logger
    socket.settings = @settings
    socket.transport = @transport
    socket.protocol = :udp
  end
end

#setup_standaloneObject

Setup standalone check executions, scheduling standard check definition and check extension executions. Check definitions and extensions with ‘:standalone` set to `true` will be scheduled by the Sensu client for execution.



302
303
304
305
306
307
308
309
310
311
# File 'lib/sensu/client/process.rb', line 302

def setup_standalone
  @logger.debug("scheduling standalone checks")
  standard_checks = @settings.checks.select do |check|
    check[:standalone]
  end
  extension_checks = @extensions.checks.select do |check|
    check[:standalone] && check[:interval].is_a?(Integer)
  end
  schedule_checks(standard_checks + extension_checks)
end

#setup_subscriptionsObject

Set up Sensu client subscriptions. Subscriptions determine the kinds of check requests the client will receive. A unique transport funnel is created for the Sensu client, using a combination of it’s name, the Sensu version, and the current timestamp (epoch). The unique funnel is bound to each transport pipe, named after the client subscription. The Sensu client will receive JSON serialized check requests from its funnel, that get parsed and processed.



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/sensu/client/process.rb', line 243

def setup_subscriptions
  @logger.debug("subscribing to client subscriptions")
  @settings[:client][:subscriptions].each do |subscription|
    @logger.debug("subscribing to a subscription", :subscription => subscription)
    funnel = [@settings[:client][:name], VERSION, Time.now.to_i].join("-")
    @transport.subscribe(:fanout, subscription, funnel) do |message_info, message|
      begin
        check = MultiJson.load(message)
        @logger.info("received check request", :check => check)
        process_check_request(check)
      rescue MultiJson::ParseError => error
        @logger.error("failed to parse the check request payload", {
          :message => message,
          :error => error.to_s
        })
      end
    end
  end
end

#startObject

Start the Sensu client process, setting up the client transport connection, the sockets, and calling the ‘bootstrap()` method.



368
369
370
371
372
# File 'lib/sensu/client/process.rb', line 368

def start
  setup_transport
  setup_sockets
  bootstrap
end

#stopObject

Stop the Sensu client process, pausing it, completing check executions in progress, closing the transport connection, and exiting the process (exit 0). After pausing the process, the process/daemon ‘@state` is set to `:stopping`.



413
414
415
416
417
418
419
420
421
# File 'lib/sensu/client/process.rb', line 413

def stop
  @logger.warn("stopping")
  pause
  @state = :stopping
  complete_checks_in_progress do
    @transport.close
    super
  end
end

#substitute_check_command_tokens(check) ⇒ Array

Substitue check command tokens (eg. :::db.name|production:::) with the associated client definition attribute value. Command tokens can provide a fallback default value, following a pipe.

Parameters:

  • check (Hash)

Returns:

  • (Array)

    containing the check command string with tokens substituted and an array of unmatched command tokens.



121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/sensu/client/process.rb', line 121

def substitute_check_command_tokens(check)
  unmatched_tokens = []
  substituted = check[:command].gsub(/:::([^:].*?):::/) do
    token, default = $1.to_s.split("|", -1)
    matched = find_client_attribute(@settings[:client], token.split("."), default)
    if matched.nil?
      unmatched_tokens << token
    end
    matched
  end
  [substituted, unmatched_tokens]
end