Class: Arborist::MonitorRunner
- Inherits:
-
Object
- Object
- Arborist::MonitorRunner
- Extended by:
- Loggability
- Includes:
- CZTop::Reactor::SignalHandling
- Defined in:
- lib/arborist/monitor_runner.rb
Overview
An event-driven runner for Arborist::Monitors.
Constant Summary collapse
- QUEUE_SIGS =
Signals the runner handles
[ :INT, :TERM, :HUP, :USR1, # :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU ] & Signal.list.keys.map( &:to_sym )
- THREAD_CLEANUP_INTERVAL =
Number of seconds between thread cleanup
5
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
The Arborist::Client that will provide the message packing and unpacking.
-
#handler ⇒ Object
The ZMQ::Handler subclass that handles all async IO.
-
#monitors ⇒ Object
readonly
The Array of loaded Arborist::Monitors the runner should run.
-
#reactor ⇒ Object
The reactor (a ZMQ::Loop) the runner uses to drive everything.
-
#request_queue ⇒ Object
readonly
The Queue of pending requests, keyed by the callback that should be called with the results.
-
#runner_threads ⇒ Object
readonly
A hash of monitor object -> thread used to contain and track running monitor threads.
Instance Method Summary collapse
-
#add_interval_timer_for(monitor) ⇒ Object
Create a repeating ZMQ::Timer that will run the specified monitor on its interval.
-
#add_splay_timer_for(monitor) ⇒ Object
Create a one-shot ZMQ::Timer that will register the interval timer for the specified
monitor
after a random number of seconds no greater than its splay. -
#add_thread_cleanup_timer ⇒ Object
Set up a timer to clean up monitor threads.
-
#add_timer_for(monitor) ⇒ Object
Register a timer for the specified
monitor
. -
#cleanup_monitor_threads ⇒ Object
Clean up any monitor runner threads that are dead.
-
#handle_io_event(event) ⇒ Object
Reactor callback – handle the client’s socket becoming writable.
-
#handle_signal(sig) ⇒ Object
Handle signals.
-
#initialize ⇒ MonitorRunner
constructor
Create a new Arborist::MonitorRunner.
-
#load_monitors(enumerator) ⇒ Object
Load monitors from the specified
enumerator
. -
#on_hangup_signal(signo) ⇒ Object
Handle a hangup by restarting the runner.
-
#on_termination_signal(signo) ⇒ Object
(also: #on_interrupt_signal)
Handle a TERM signal.
-
#queue_request(request, &callback) ⇒ Object
Add the specified
event
to the queue to be published to the console event socket. -
#register ⇒ Object
Register the handler’s pollitem as being ready to write if it isn’t already.
-
#registered? ⇒ Boolean
Returns
true
if the runner’s client socket is currently registered for writing. -
#restart ⇒ Object
Restart the runner.
-
#run ⇒ Object
Run the specified
monitors
. -
#run_monitor(monitor) ⇒ Object
Update nodes with the results of a monitor’s run.
-
#run_monitor_safely(monitor, nodes) ⇒ Object
Exec
monitor
against the providednodes
hash, treating runtime exceptions as an error condition. -
#search(criteria, exclude_down, properties, negative = {}, &block) ⇒ Object
Create a search request using the runner’s client, then queue the request up with the specified
block
as the callback. -
#stop ⇒ Object
Stop the runner.
-
#unregister ⇒ Object
Unregister the handler’s pollitem from the reactor when there’s nothing ready to write.
-
#update(nodemap, monitor_key, &block) ⇒ Object
Create an update request using the runner’s client, then queue the request up with the specified
block
as the callback.
Constructor Details
#initialize ⇒ MonitorRunner
Create a new Arborist::MonitorRunner
34 35 36 37 38 39 40 41 |
# File 'lib/arborist/monitor_runner.rb', line 34 def initialize @monitors = [] @handler = nil @reactor = CZTop::Reactor.new @client = Arborist::Client.new @runner_threads = {} @request_queue = {} end |
Instance Attribute Details
#client ⇒ Object (readonly)
The Arborist::Client that will provide the message packing and unpacking
67 68 69 |
# File 'lib/arborist/monitor_runner.rb', line 67 def client @client end |
#handler ⇒ Object
The ZMQ::Handler subclass that handles all async IO
54 55 56 |
# File 'lib/arborist/monitor_runner.rb', line 54 def handler @handler end |
#monitors ⇒ Object (readonly)
The Array of loaded Arborist::Monitors the runner should run.
50 51 52 |
# File 'lib/arborist/monitor_runner.rb', line 50 def monitors @monitors end |
#reactor ⇒ Object
The reactor (a ZMQ::Loop) the runner uses to drive everything
58 59 60 |
# File 'lib/arborist/monitor_runner.rb', line 58 def reactor @reactor end |
#request_queue ⇒ Object (readonly)
The Queue of pending requests, keyed by the callback that should be called with the results.
63 64 65 |
# File 'lib/arborist/monitor_runner.rb', line 63 def request_queue @request_queue end |
#runner_threads ⇒ Object (readonly)
A hash of monitor object -> thread used to contain and track running monitor threads.
71 72 73 |
# File 'lib/arborist/monitor_runner.rb', line 71 def runner_threads @runner_threads end |
Instance Method Details
#add_interval_timer_for(monitor) ⇒ Object
Create a repeating ZMQ::Timer that will run the specified monitor on its interval.
242 243 244 245 246 247 248 249 250 251 |
# File 'lib/arborist/monitor_runner.rb', line 242 def add_interval_timer_for( monitor ) interval = monitor.interval self.log.info "Creating timer for %p" % [ monitor ] return self.reactor.add_periodic_timer( interval ) do unless self.runner_threads.key?( monitor ) self.run_monitor( monitor ) end end end |
#add_splay_timer_for(monitor) ⇒ Object
Create a one-shot ZMQ::Timer that will register the interval timer for the specified monitor
after a random number of seconds no greater than its splay.
256 257 258 259 260 261 262 263 |
# File 'lib/arborist/monitor_runner.rb', line 256 def add_splay_timer_for( monitor ) delay = rand( monitor.splay ) self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ] self.reactor.add_oneshot_timer( delay ) do self.add_interval_timer_for( monitor ) end end |
#add_thread_cleanup_timer ⇒ Object
Set up a timer to clean up monitor threads.
267 268 269 270 271 272 |
# File 'lib/arborist/monitor_runner.rb', line 267 def add_thread_cleanup_timer self.log.debug "Starting thread cleanup timer for %p." % [ self.runner_threads ] self.reactor.add_periodic_timer( THREAD_CLEANUP_INTERVAL ) do self.cleanup_monitor_threads end end |
#add_timer_for(monitor) ⇒ Object
Register a timer for the specified monitor
.
232 233 234 235 236 237 238 |
# File 'lib/arborist/monitor_runner.rb', line 232 def add_timer_for( monitor ) if monitor.splay.nonzero? self.add_splay_timer_for( monitor ) else self.add_interval_timer_for( monitor ) end end |
#cleanup_monitor_threads ⇒ Object
Clean up any monitor runner threads that are dead.
278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/arborist/monitor_runner.rb', line 278 def cleanup_monitor_threads self.runner_threads.values.reject( &:alive? ).each do |thr| monitor = self.runner_threads.key( thr ) self.runner_threads.delete( monitor ) begin thr.join rescue => err self.log.error "%p while running %s: %s" % [ err.class, thr[:monitor_desc], err. ] end end end |
#handle_io_event(event) ⇒ Object
Reactor callback – handle the client’s socket becoming writable.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/arborist/monitor_runner.rb', line 111 def handle_io_event( event ) if event.writable? if (( pair = self.request_queue.shift )) callback, request = *pair res = self.client.send_tree_api_request( request ) callback.call( res ) end self.unregister if self.request_queue.empty? else raise "Unexpected %p on the tree API socket" % [ event ] end end |
#handle_signal(sig) ⇒ Object
Handle signals.
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/arborist/monitor_runner.rb', line 300 def handle_signal( sig ) self.log.debug "Handling signal %s" % [ sig ] case sig when :INT, :TERM self.on_termination_signal( sig ) when :HUP self.on_hangup_signal( sig ) when :USR1 self.on_user1_signal( sig ) else self.log.warn "Unhandled signal %s" % [ sig ] end end |
#load_monitors(enumerator) ⇒ Object
Load monitors from the specified enumerator
.
75 76 77 |
# File 'lib/arborist/monitor_runner.rb', line 75 def load_monitors( enumerator ) self.monitors.concat( enumerator.to_a ) end |
#on_hangup_signal(signo) ⇒ Object
Handle a hangup by restarting the runner.
329 330 331 332 |
# File 'lib/arborist/monitor_runner.rb', line 329 def on_hangup_signal( signo ) self.log.warn "Hangup (%p)" % [ signo ] self.restart end |
#on_termination_signal(signo) ⇒ Object Also known as: on_interrupt_signal
Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to #on_interrupt_signal.
321 322 323 324 |
# File 'lib/arborist/monitor_runner.rb', line 321 def on_termination_signal( signo ) self.log.warn "Terminated (%p)" % [ signo ] self.stop end |
#queue_request(request, &callback) ⇒ Object
Add the specified event
to the queue to be published to the console event socket
204 205 206 207 |
# File 'lib/arborist/monitor_runner.rb', line 204 def queue_request( request, &callback ) self.request_queue[ callback ] = request self.register end |
#register ⇒ Object
Register the handler’s pollitem as being ready to write if it isn’t already.
217 218 219 220 |
# File 'lib/arborist/monitor_runner.rb', line 217 def register # self.log.debug "Registering for writing." self.reactor.enable_events( self.client.tree_api, :write ) unless self.registered? end |
#registered? ⇒ Boolean
Returns true
if the runner’s client socket is currently registered for writing.
211 212 213 |
# File 'lib/arborist/monitor_runner.rb', line 211 def registered? return self.reactor.event_enabled?( self.client.tree_api, :write ) end |
#restart ⇒ Object
Restart the runner
96 97 98 99 100 |
# File 'lib/arborist/monitor_runner.rb', line 96 def restart # :TODO: Kill any running monitor children, cancel monitor timers, and reload # monitors from the monitor enumerator raise NotImplementedError end |
#run ⇒ Object
Run the specified monitors
81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/arborist/monitor_runner.rb', line 81 def run self.monitors.each do |mon| self.add_timer_for( mon ) end self.add_thread_cleanup_timer self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) ) self.reactor.start_polling( ignore_interrupts: true ) end end |
#run_monitor(monitor) ⇒ Object
Update nodes with the results of a monitor’s run.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/arborist/monitor_runner.rb', line 128 def run_monitor( monitor ) positive = monitor.positive_criteria negative = monitor.negative_criteria exclude_down = monitor.exclude_down? props = monitor.node_properties self.search( positive, exclude_down, props, negative ) do |nodes| self.log.info "Running %p monitor for %d node(s)" % [ monitor.description, nodes.length ] unless nodes.empty? self.runner_threads[ monitor ] = Thread.new do Thread.current[:monitor_desc] = monitor.description results = self.run_monitor_safely( monitor, nodes ) self.log.debug " updating with results: %p" % [ results ] self.update( results, monitor.key ) do self.log.debug "Updated %d via the '%s' monitor" % [ results.length, monitor.description ] end end self.log.debug "THREAD: Started %p for %p" % [ self.runner_threads[monitor], monitor ] self.log.debug "THREAD: Runner threads have: %p" % [ self.runner_threads.to_a ] end end end |
#run_monitor_safely(monitor, nodes) ⇒ Object
Exec monitor
against the provided nodes
hash, treating runtime exceptions as an error condition. Returns an update hash, keyed by node identifier.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/arborist/monitor_runner.rb', line 162 def run_monitor_safely( monitor, nodes ) results = begin monitor.run( nodes ) rescue => err errmsg = "Exception while running %p monitor: %s: %s" % [ monitor.description, err.class.name, err. ] self.log.error "%s\n%s" % [ errmsg, err.backtrace.join("\n ") ] nodes.keys.each_with_object({}) do |id, node_results| node_results[id] = { error: errmsg } end end return results end |
#search(criteria, exclude_down, properties, negative = {}, &block) ⇒ Object
Create a search request using the runner’s client, then queue the request up with the specified block
as the callback.
183 184 185 186 187 188 189 190 |
# File 'lib/arborist/monitor_runner.rb', line 183 def search( criteria, exclude_down, properties, negative={}, &block ) search = self.client.make_search_request( criteria, exclude_down: exclude_down, properties: properties, exclude: negative ) self.queue_request( search, &block ) end |
#stop ⇒ Object
Stop the runner.
104 105 106 107 |
# File 'lib/arborist/monitor_runner.rb', line 104 def stop self.log.info "Stopping the runner." self.reactor.stop_polling end |
#unregister ⇒ Object
Unregister the handler’s pollitem from the reactor when there’s nothing ready to write.
225 226 227 228 |
# File 'lib/arborist/monitor_runner.rb', line 225 def unregister # self.log.debug "Unregistering for writing." self.reactor.disable_events( self.client.tree_api, :write ) if self.registered? end |
#update(nodemap, monitor_key, &block) ⇒ Object
Create an update request using the runner’s client, then queue the request up with the specified block
as the callback.
195 196 197 198 199 |
# File 'lib/arborist/monitor_runner.rb', line 195 def update( nodemap, monitor_key, &block ) return if nodemap.empty? update = self.client.make_update_request( nodemap, monitor_key: monitor_key ) self.queue_request( update, &block ) end |