Class: Arborist::MonitorRunner

Inherits:
Object
  • Object
show all
Extended by:
Loggability
Includes:
CZTop::Reactor::SignalHandling
Defined in:
lib/arborist/monitor_runner.rb

Overview

An event-driven runner for Arborist::Monitors.

Constant Summary collapse

QUEUE_SIGS =

Signals the runner handles

[
	:INT, :TERM, :HUP, :USR1,
	# :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU
] & Signal.list.keys.map( &:to_sym )
THREAD_CLEANUP_INTERVAL =

Number of seconds between thread cleanup

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMonitorRunner

Create a new Arborist::MonitorRunner



34
35
36
37
38
39
40
41
# File 'lib/arborist/monitor_runner.rb', line 34

def initialize
	@monitors        = []
	@handler         = nil
	@reactor         = CZTop::Reactor.new
	@client          = Arborist::Client.new
	@runner_threads  = {}
	@request_queue   = {}
end

Instance Attribute Details

#clientObject (readonly)

The Arborist::Client that will provide the message packing and unpacking



67
68
69
# File 'lib/arborist/monitor_runner.rb', line 67

def client
  @client
end

#handlerObject

The ZMQ::Handler subclass that handles all async IO



54
55
56
# File 'lib/arborist/monitor_runner.rb', line 54

def handler
  @handler
end

#monitorsObject (readonly)

The Array of loaded Arborist::Monitors the runner should run.



50
51
52
# File 'lib/arborist/monitor_runner.rb', line 50

def monitors
  @monitors
end

#reactorObject

The reactor (a ZMQ::Loop) the runner uses to drive everything



58
59
60
# File 'lib/arborist/monitor_runner.rb', line 58

def reactor
  @reactor
end

#request_queueObject (readonly)

The Queue of pending requests, keyed by the callback that should be called with the results.



63
64
65
# File 'lib/arborist/monitor_runner.rb', line 63

def request_queue
  @request_queue
end

#runner_threadsObject (readonly)

A hash of monitor object -> thread used to contain and track running monitor threads.



71
72
73
# File 'lib/arborist/monitor_runner.rb', line 71

def runner_threads
  @runner_threads
end

Instance Method Details

#add_interval_timer_for(monitor) ⇒ Object

Create a repeating ZMQ::Timer that will run the specified monitor on its interval.



242
243
244
245
246
247
248
249
250
251
# File 'lib/arborist/monitor_runner.rb', line 242

def add_interval_timer_for( monitor )
	interval = monitor.interval
	self.log.info "Creating timer for %p" % [ monitor ]

	return self.reactor.add_periodic_timer( interval ) do
		unless self.runner_threads.key?( monitor )
			self.run_monitor( monitor )
		end
	end
end

#add_splay_timer_for(monitor) ⇒ Object

Create a one-shot ZMQ::Timer that will register the interval timer for the specified monitor after a random number of seconds no greater than its splay.



256
257
258
259
260
261
262
263
# File 'lib/arborist/monitor_runner.rb', line 256

def add_splay_timer_for( monitor )
	delay = rand( monitor.splay )
	self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ]

	self.reactor.add_oneshot_timer( delay ) do
		self.add_interval_timer_for( monitor )
	end
end

#add_thread_cleanup_timerObject

Set up a timer to clean up monitor threads.



267
268
269
270
271
272
# File 'lib/arborist/monitor_runner.rb', line 267

def add_thread_cleanup_timer
	self.log.debug "Starting thread cleanup timer for %p." % [ self.runner_threads ]
	self.reactor.add_periodic_timer( THREAD_CLEANUP_INTERVAL ) do
		self.cleanup_monitor_threads
	end
end

#add_timer_for(monitor) ⇒ Object

Register a timer for the specified monitor.



232
233
234
235
236
237
238
# File 'lib/arborist/monitor_runner.rb', line 232

def add_timer_for( monitor )
	if monitor.splay.nonzero?
		self.add_splay_timer_for( monitor )
	else
		self.add_interval_timer_for( monitor )
	end
end

#cleanup_monitor_threadsObject

Clean up any monitor runner threads that are dead.



278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/arborist/monitor_runner.rb', line 278

def cleanup_monitor_threads
	self.runner_threads.values.reject( &:alive? ).each do |thr|
		monitor = self.runner_threads.key( thr )
		self.runner_threads.delete( monitor )

		begin
			thr.join
		rescue => err
			self.log.error "%p while running %s: %s" %
				[ err.class, thr[:monitor_desc], err.message ]
		end
	end
end

#handle_io_event(event) ⇒ Object

Reactor callback – handle the client’s socket becoming writable.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/arborist/monitor_runner.rb', line 111

def handle_io_event( event )
	if event.writable?
		if (( pair = self.request_queue.shift ))
			callback, request = *pair
			res = self.client.send_tree_api_request( request )
			callback.call( res )
		end

		self.unregister if self.request_queue.empty?
	else
		raise "Unexpected %p on the tree API socket" % [ event ]
	end

end

#handle_signal(sig) ⇒ Object

Handle signals.



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/arborist/monitor_runner.rb', line 300

def handle_signal( sig )
	self.log.debug "Handling signal %s" % [ sig ]
	case sig
	when :INT, :TERM
		self.on_termination_signal( sig )

	when :HUP
		self.on_hangup_signal( sig )

	when :USR1
		self.on_user1_signal( sig )

	else
		self.log.warn "Unhandled signal %s" % [ sig ]
	end

end

#load_monitors(enumerator) ⇒ Object

Load monitors from the specified enumerator.



75
76
77
# File 'lib/arborist/monitor_runner.rb', line 75

def load_monitors( enumerator )
	self.monitors.concat( enumerator.to_a )
end

#on_hangup_signal(signo) ⇒ Object

Handle a hangup by restarting the runner.



329
330
331
332
# File 'lib/arborist/monitor_runner.rb', line 329

def on_hangup_signal( signo )
	self.log.warn "Hangup (%p)" % [ signo ]
	self.restart
end

#on_termination_signal(signo) ⇒ Object Also known as: on_interrupt_signal

Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to #on_interrupt_signal.



321
322
323
324
# File 'lib/arborist/monitor_runner.rb', line 321

def on_termination_signal( signo )
	self.log.warn "Terminated (%p)" % [ signo ]
	self.stop
end

#queue_request(request, &callback) ⇒ Object

Add the specified event to the queue to be published to the console event socket



204
205
206
207
# File 'lib/arborist/monitor_runner.rb', line 204

def queue_request( request, &callback )
	self.request_queue[ callback ] = request
	self.register
end

#registerObject

Register the handler’s pollitem as being ready to write if it isn’t already.



217
218
219
220
# File 'lib/arborist/monitor_runner.rb', line 217

def register
	# self.log.debug "Registering for writing."
	self.reactor.enable_events( self.client.tree_api, :write ) unless self.registered?
end

#registered?Boolean

Returns true if the runner’s client socket is currently registered for writing.

Returns:

  • (Boolean)


211
212
213
# File 'lib/arborist/monitor_runner.rb', line 211

def registered?
	return self.reactor.event_enabled?( self.client.tree_api, :write )
end

#restartObject

Restart the runner

Raises:

  • (NotImplementedError)


96
97
98
99
100
# File 'lib/arborist/monitor_runner.rb', line 96

def restart
	# :TODO: Kill any running monitor children, cancel monitor timers, and reload
	# monitors from the monitor enumerator
	raise NotImplementedError
end

#runObject

Run the specified monitors



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/arborist/monitor_runner.rb', line 81

def run
	self.monitors.each do |mon|
		self.add_timer_for( mon )
	end

	self.add_thread_cleanup_timer

	self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do
		self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) )
		self.reactor.start_polling( ignore_interrupts: true )
	end
end

#run_monitor(monitor) ⇒ Object

Update nodes with the results of a monitor’s run.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/arborist/monitor_runner.rb', line 128

def run_monitor( monitor )
	positive     = monitor.positive_criteria
	negative     = monitor.negative_criteria
	exclude_down = monitor.exclude_down?
	props        = monitor.node_properties

	self.search( positive, exclude_down, props, negative ) do |nodes|
		self.log.info "Running %p monitor for %d node(s)" % [
			monitor.description,
			nodes.length
		]

		unless nodes.empty?
			self.runner_threads[ monitor ] = Thread.new do
				Thread.current[:monitor_desc] = monitor.description
				results = self.run_monitor_safely( monitor, nodes )

				self.log.debug "  updating with results: %p" % [ results ]
				self.update( results, monitor.key ) do
					self.log.debug "Updated %d via the '%s' monitor" %
						[ results.length, monitor.description ]
				end
			end
			self.log.debug "THREAD: Started %p for %p" % [ self.runner_threads[monitor], monitor ]
			self.log.debug "THREAD: Runner threads have: %p" % [ self.runner_threads.to_a ]
		end
	end
end

#run_monitor_safely(monitor, nodes) ⇒ Object

Exec monitor against the provided nodes hash, treating runtime exceptions as an error condition. Returns an update hash, keyed by node identifier.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/arborist/monitor_runner.rb', line 162

def run_monitor_safely( monitor, nodes )
	results = begin
		monitor.run( nodes )
	rescue => err
		errmsg = "Exception while running %p monitor: %s: %s" % [
			monitor.description,
			err.class.name,
			err.message
		]
		self.log.error "%s\n%s" % [ errmsg, err.backtrace.join("\n  ") ]
		nodes.keys.each_with_object({}) do |id, node_results|
			node_results[id] = { error: errmsg }
		end
	end

	return results
end

#search(criteria, exclude_down, properties, negative = {}, &block) ⇒ Object

Create a search request using the runner’s client, then queue the request up with the specified block as the callback.



183
184
185
186
187
188
189
190
# File 'lib/arborist/monitor_runner.rb', line 183

def search( criteria, exclude_down, properties, negative={}, &block )
	search = self.client.make_search_request( criteria,
		exclude_down: exclude_down,
		properties: properties,
		exclude: negative
	)
	self.queue_request( search, &block )
end

#stopObject

Stop the runner.



104
105
106
107
# File 'lib/arborist/monitor_runner.rb', line 104

def stop
	self.log.info "Stopping the runner."
	self.reactor.stop_polling
end

#unregisterObject

Unregister the handler’s pollitem from the reactor when there’s nothing ready to write.



225
226
227
228
# File 'lib/arborist/monitor_runner.rb', line 225

def unregister
	# self.log.debug "Unregistering for writing."
	self.reactor.disable_events( self.client.tree_api, :write ) if self.registered?
end

#update(nodemap, monitor_key, &block) ⇒ Object

Create an update request using the runner’s client, then queue the request up with the specified block as the callback.



195
196
197
198
199
# File 'lib/arborist/monitor_runner.rb', line 195

def update( nodemap, monitor_key, &block )
	return if nodemap.empty?
	update = self.client.make_update_request( nodemap, monitor_key: monitor_key )
	self.queue_request( update, &block )
end