Class: Sensu::Client::Process
- Inherits:
-
Object
- Object
- Sensu::Client::Process
- Includes:
- Daemon
- Defined in:
- lib/sensu/client/process.rb
Instance Attribute Summary collapse
-
#safe_mode ⇒ Object
Returns the value of attribute safe_mode.
Class Method Summary collapse
-
.run(options = {}) ⇒ Object
Create an instance of the Sensu client process, start the client within the EventMachine event loop, and set up client process signal traps (for stopping).
Instance Method Summary collapse
-
#bootstrap ⇒ Object
Bootstrap the Sensu client, setting up client keepalives, subscriptions, and standalone check executions.
-
#calculate_execution_splay(check) ⇒ Object
Calculate a check execution splay, taking into account the current time and the execution interval to ensure it’s consistent between process restarts.
-
#complete_checks_in_progress(&callback) ⇒ Object
Call a callback (Ruby block) when there are no longer check executions in progress.
-
#execute_check_command(check) ⇒ Object
Execute a check command, capturing its output (STDOUT/ERR), exit status code, execution duration, timestamp, and publish the result.
-
#find_client_attribute(tree, path, default) ⇒ Object
Traverse the Sensu client definition (hash) for an attribute value, with a fallback default value if nil.
-
#initialize(options = {}) ⇒ Process
constructor
Override Daemon initialize() to support Sensu client check execution safe mode and checks in progress.
-
#keepalive_payload ⇒ Hash
Create a Sensu client keepalive payload, to be sent over the transport for processing.
-
#pause ⇒ Object
Pause the Sensu client process, unless it is being paused or has already been paused.
-
#process_check_request(check) ⇒ Object
Process a check request.
-
#publish_check_result(check) ⇒ Object
Publish a check result to the transport for processing.
-
#publish_keepalive ⇒ Object
Publish a Sensu client keepalive to the transport for processing.
-
#resume ⇒ Object
Resume the Sensu client process if it is currently or will soon be paused.
-
#run_check_extension(check) ⇒ Object
Run a check extension and publish the result.
-
#schedule_checks(checks) ⇒ Object
Schedule check executions, using EventMachine periodic timers, using a calculated execution splay.
-
#setup_keepalives ⇒ Object
Schedule Sensu client keepalives.
-
#setup_sockets ⇒ Object
Setup the Sensu client socket, for external check result input.
-
#setup_standalone ⇒ Object
Setup standalone check executions, scheduling standard check definition and check extension executions.
-
#setup_subscriptions ⇒ Object
Set up Sensu client subscriptions.
-
#start ⇒ Object
Start the Sensu client process, setting up the client transport connection, the sockets, and calling the ‘bootstrap()` method.
-
#stop ⇒ Object
Stop the Sensu client process, pausing it, completing check executions in progress, closing the transport connection, and exiting the process (exit 0).
-
#substitute_check_command_tokens(check) ⇒ Array
Substitue check command tokens (eg. :::db.name|production:::) with the associated client definition attribute value.
Methods included from Daemon
#load_extensions, #load_settings, #log_concerns, #setup_logger, #setup_process, #setup_redis, #setup_signal_traps, #setup_transport
Methods included from Utilities
#deep_merge, #random_uuid, #redact_sensitive, #retry_until_true, #testing?
Constructor Details
#initialize(options = {}) ⇒ Process
Override Daemon initialize() to support Sensu client check execution safe mode and checks in progress.
28 29 30 31 32 |
# File 'lib/sensu/client/process.rb', line 28 def initialize(={}) super @safe_mode = @settings[:client][:safe_mode] || false @checks_in_progress = [] end |
Instance Attribute Details
#safe_mode ⇒ Object
Returns the value of attribute safe_mode.
9 10 11 |
# File 'lib/sensu/client/process.rb', line 9 def safe_mode @safe_mode end |
Class Method Details
.run(options = {}) ⇒ Object
Create an instance of the Sensu client process, start the client within the EventMachine event loop, and set up client process signal traps (for stopping).
16 17 18 19 20 21 22 |
# File 'lib/sensu/client/process.rb', line 16 def self.run(={}) client = self.new() EM::run do client.start client.setup_signal_traps end end |
Instance Method Details
#bootstrap ⇒ Object
Bootstrap the Sensu client, setting up client keepalives, subscriptions, and standalone check executions. This method sets the process/daemon ‘@state` to `:running`.
358 359 360 361 362 363 |
# File 'lib/sensu/client/process.rb', line 358 def bootstrap setup_keepalives setup_subscriptions setup_standalone @state = :running end |
#calculate_execution_splay(check) ⇒ Object
Calculate a check execution splay, taking into account the current time and the execution interval to ensure it’s consistent between process restarts.
268 269 270 271 272 273 |
# File 'lib/sensu/client/process.rb', line 268 def calculate_execution_splay(check) key = [@settings[:client][:name], check[:name]].join(":") splay_hash = Digest::MD5.digest(key).unpack("Q<").first current_time = (Time.now.to_f * 1000).to_i (splay_hash - current_time) % (check[:interval] * 1000) / 1000.0 end |
#complete_checks_in_progress(&callback) ⇒ Object
Call a callback (Ruby block) when there are no longer check executions in progress. This method is used when stopping the Sensu client. The ‘retry_until_true` helper method is used to check the condition every 0.5 seconds until `true` is returned.
345 346 347 348 349 350 351 352 353 |
# File 'lib/sensu/client/process.rb', line 345 def complete_checks_in_progress(&callback) @logger.info("completing checks in progress", :checks_in_progress => @checks_in_progress) retry_until_true do if @checks_in_progress.empty? callback.call true end end end |
#execute_check_command(check) ⇒ Object
Execute a check command, capturing its output (STDOUT/ERR), exit status code, execution duration, timestamp, and publish the result. This method guards against multiple executions for the same check. Check command tokens are substituted with the associated client attribute values. If there are unmatched check command tokens, the check command will not be executed, instead a check result will be published reporting the unmatched tokens.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/sensu/client/process.rb', line 144 def execute_check_command(check) @logger.debug("attempting to execute check command", :check => check) unless @checks_in_progress.include?(check[:name]) @checks_in_progress << check[:name] command, unmatched_tokens = substitute_check_command_tokens(check) if unmatched_tokens.empty? check[:executed] = Time.now.to_i started = Time.now.to_f Spawn.process(command, :timeout => check[:timeout]) do |output, status| check[:duration] = ("%.3f" % (Time.now.to_f - started)).to_f check[:output] = output check[:status] = status publish_check_result(check) @checks_in_progress.delete(check[:name]) end else check[:output] = "Unmatched command tokens: " + unmatched_tokens.join(", ") check[:status] = 3 check[:handle] = false publish_check_result(check) @checks_in_progress.delete(check[:name]) end else @logger.warn("previous check command execution in progress", :check => check) end end |
#find_client_attribute(tree, path, default) ⇒ Object
Traverse the Sensu client definition (hash) for an attribute value, with a fallback default value if nil.
105 106 107 108 109 110 111 112 |
# File 'lib/sensu/client/process.rb', line 105 def find_client_attribute(tree, path, default) attribute = tree[path.shift] if attribute.is_a?(Hash) find_client_attribute(attribute, path, default) else attribute.nil? ? default : attribute end end |
#keepalive_payload ⇒ Hash
Create a Sensu client keepalive payload, to be sent over the transport for processing. A client keepalive is composed of its settings definition, the Sensu version, and a timestamp. Sensitive information is redacted from the keepalive payload.
40 41 42 43 44 45 46 |
# File 'lib/sensu/client/process.rb', line 40 def keepalive_payload payload = @settings[:client].merge({ :version => VERSION, :timestamp => Time.now.to_i }) redact_sensitive(payload, @settings[:client][:redact]) end |
#pause ⇒ Object
Pause the Sensu client process, unless it is being paused or has already been paused. The process/daemon ‘@state` is first set to `:pausing`, to indicate that it’s in progress. All run timers are cancelled, and the references are cleared. The Sensu client will unsubscribe from all transport subscriptions, then set the process/daemon ‘@state` to `:paused`.
381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/sensu/client/process.rb', line 381 def pause unless @state == :pausing || @state == :paused @state = :pausing @timers[:run].each do |timer| timer.cancel end @timers[:run].clear @transport.unsubscribe @state = :paused end end |
#process_check_request(check) ⇒ Object
Process a check request. If a check request has a check command, it will be executed. A standard check request will be merged with a local check definition, if present. Client safe mode is enforced in this method, requiring a local check definition in order to execute the check command. If a local check definition does not exist when operating with client safe mode, a check result will be published to report the missing check definition. A check request without a command indicates a check extension run. The check request may contain ‘:extension`, the name of the extension to run. If `:extension` is not present, the check name is used for the extension name. If a check extension does not exist for a name, a check result will be published to report the unknown check extension.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/sensu/client/process.rb', line 210 def process_check_request(check) @logger.debug("processing check", :check => check) if @settings.check_exists?(check[:name]) check.merge!(@settings[:checks][check[:name]]) end if check.has_key?(:command) if @safe_mode && !@settings.check_exists?(check[:name]) check[:output] = "Check is not locally defined (safe mode)" check[:status] = 3 check[:handle] = false check[:executed] = Time.now.to_i publish_check_result(check) else execute_check_command(check) end else extension_name = check[:extension] || check[:name] if @extensions.check_exists?(extension_name) run_check_extension(check) else @logger.warn("unknown check extension", :check => check) end end end |
#publish_check_result(check) ⇒ Object
Publish a check result to the transport for processing. A check result is composed of a client (name) and a check definition, containing check ‘:output` and `:status`. JSON serialization is used when publishing the check result payload to the transport pipe. Transport errors are logged.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/sensu/client/process.rb', line 82 def publish_check_result(check) payload = { :client => @settings[:client][:name], :check => check } @logger.info("publishing check result", :payload => payload) @transport.publish(:direct, "results", MultiJson.dump(payload)) do |info| if info[:error] @logger.error("failed to publish check result", { :payload => payload, :error => info[:error].to_s }) end end end |
#publish_keepalive ⇒ Object
Publish a Sensu client keepalive to the transport for processing. JSON serialization is used for transport messages.
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/sensu/client/process.rb', line 50 def publish_keepalive payload = keepalive_payload @logger.debug("publishing keepalive", :payload => payload) @transport.publish(:direct, "keepalives", MultiJson.dump(payload)) do |info| if info[:error] @logger.error("failed to publish keepalive", { :payload => payload, :error => info[:error].to_s }) end end end |
#resume ⇒ Object
Resume the Sensu client process if it is currently or will soon be paused. The ‘retry_until_true` helper method is used to determine if the process is paused and if the transport is connected. If the conditions are met, `bootstrap()` will be called and true is returned to stop `retry_until_true`.
398 399 400 401 402 403 404 405 406 407 |
# File 'lib/sensu/client/process.rb', line 398 def resume retry_until_true(1) do if @state == :paused if @transport.connected? bootstrap true end end end end |
#run_check_extension(check) ⇒ Object
Run a check extension and publish the result. The Sensu client loads check extensions, checks that run within the Sensu Ruby VM and the EventMachine event loop, using the Sensu Extension API. If a check definition includes ‘:extension`, use it’s value for the extension name, otherwise use the check name. The check definition is passed to the extension ‘safe_run()` method as a parameter, the extension may utilize it.
182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/sensu/client/process.rb', line 182 def run_check_extension(check) @logger.debug("attempting to run check extension", :check => check) check[:executed] = Time.now.to_i extension_name = check[:extension] || check[:name] extension = @extensions[:checks][extension_name] extension.safe_run(check) do |output, status| check[:output] = output check[:status] = status publish_check_result(check) end end |
#schedule_checks(checks) ⇒ Object
Schedule check executions, using EventMachine periodic timers, using a calculated execution splay. The timers are stored in the timers hash under ‘:run`, so they can be cancelled etc. Check definitions are duplicated before processing them, in case they are mutated. The check `:issued` timestamp is set here, to mimic check requests issued by a Sensu server.
283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/sensu/client/process.rb', line 283 def schedule_checks(checks) checks.each do |check| execute_check = Proc.new do check[:issued] = Time.now.to_i process_check_request(check.dup) end execution_splay = testing? ? 0 : calculate_execution_splay(check) interval = testing? ? 0.5 : check[:interval] @timers[:run] << EM::Timer.new(execution_splay) do execute_check.call @timers[:run] << EM::PeriodicTimer.new(interval, &execute_check) end end end |
#setup_keepalives ⇒ Object
Schedule Sensu client keepalives. Immediately publish a keepalive to register the client, then publish a keepalive every 20 seconds. Sensu client keepalives are used to determine client (& machine) health.
67 68 69 70 71 72 73 |
# File 'lib/sensu/client/process.rb', line 67 def setup_keepalives @logger.debug("scheduling keepalives") publish_keepalive @timers[:run] << EM::PeriodicTimer.new(20) do publish_keepalive end end |
#setup_sockets ⇒ Object
Setup the Sensu client socket, for external check result input. By default, the client socket is bound to localhost on TCP & UDP port 3030. The socket can be configured via the client definition, ‘:socket` with `:bind` and `:port`. The current instance of the Sensu logger, settings, and transport are passed to the socket handler, `Sensu::Client::Socket`.
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/sensu/client/process.rb', line 319 def setup_sockets = @settings[:client][:socket] || Hash.new [:bind] ||= "127.0.0.1" [:port] ||= 3030 @logger.debug("binding client tcp and udp sockets", :options => ) EM::start_server([:bind], [:port], Socket) do |socket| socket.logger = @logger socket.settings = @settings socket.transport = @transport end EM::open_datagram_socket([:bind], [:port], Socket) do |socket| socket.logger = @logger socket.settings = @settings socket.transport = @transport socket.protocol = :udp end end |
#setup_standalone ⇒ Object
Setup standalone check executions, scheduling standard check definition and check extension executions. Check definitions and extensions with ‘:standalone` set to `true` will be scheduled by the Sensu client for execution.
302 303 304 305 306 307 308 309 310 311 |
# File 'lib/sensu/client/process.rb', line 302 def setup_standalone @logger.debug("scheduling standalone checks") standard_checks = @settings.checks.select do |check| check[:standalone] end extension_checks = @extensions.checks.select do |check| check[:standalone] && check[:interval].is_a?(Integer) end schedule_checks(standard_checks + extension_checks) end |
#setup_subscriptions ⇒ Object
Set up Sensu client subscriptions. Subscriptions determine the kinds of check requests the client will receive. A unique transport funnel is created for the Sensu client, using a combination of it’s name, the Sensu version, and the current timestamp (epoch). The unique funnel is bound to each transport pipe, named after the client subscription. The Sensu client will receive JSON serialized check requests from its funnel, that get parsed and processed.
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/sensu/client/process.rb', line 243 def setup_subscriptions @logger.debug("subscribing to client subscriptions") @settings[:client][:subscriptions].each do |subscription| @logger.debug("subscribing to a subscription", :subscription => subscription) funnel = [@settings[:client][:name], VERSION, Time.now.to_i].join("-") @transport.subscribe(:fanout, subscription, funnel) do |, | begin check = MultiJson.load() @logger.info("received check request", :check => check) process_check_request(check) rescue MultiJson::ParseError => error @logger.error("failed to parse the check request payload", { :message => , :error => error.to_s }) end end end end |
#start ⇒ Object
Start the Sensu client process, setting up the client transport connection, the sockets, and calling the ‘bootstrap()` method.
368 369 370 371 372 |
# File 'lib/sensu/client/process.rb', line 368 def start setup_transport setup_sockets bootstrap end |
#stop ⇒ Object
Stop the Sensu client process, pausing it, completing check executions in progress, closing the transport connection, and exiting the process (exit 0). After pausing the process, the process/daemon ‘@state` is set to `:stopping`.
413 414 415 416 417 418 419 420 421 |
# File 'lib/sensu/client/process.rb', line 413 def stop @logger.warn("stopping") pause @state = :stopping complete_checks_in_progress do @transport.close super end end |
#substitute_check_command_tokens(check) ⇒ Array
Substitue check command tokens (eg. :::db.name|production:::) with the associated client definition attribute value. Command tokens can provide a fallback default value, following a pipe.
121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/sensu/client/process.rb', line 121 def substitute_check_command_tokens(check) unmatched_tokens = [] substituted = check[:command].gsub(/:::([^:].*?):::/) do token, default = $1.to_s.split("|", -1) matched = find_client_attribute(@settings[:client], token.split("."), default) if matched.nil? unmatched_tokens << token end matched end [substituted, unmatched_tokens] end |