Class: Arborist::ObserverRunner
- Inherits:
-
Object
- Object
- Arborist::ObserverRunner
- Extended by:
- Loggability
- Includes:
- CZTop::Reactor::SignalHandling
- Defined in:
- lib/arborist/observer_runner.rb
Overview
An event-driven runner for Arborist::Observers.
Constant Summary collapse
- QUEUE_SIGS =
Signals the observer runner responds to
[ :INT, :TERM, :HUP, # :TODO: :QUIT, :WINCH, :USR1, :USR2, :TTIN, :TTOU ] & Signal.list.keys.map( &:to_sym )
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
The Arborist::Client that will be used for creating and tearing down subscriptions.
-
#observers ⇒ Object
readonly
The Array of loaded Arborist::Observers the runner should run.
-
#reactor ⇒ Object
The reactor (a CZTop::Reactor) the runner uses to drive everything.
-
#subscriptions ⇒ Object
readonly
The map of subscription IDs to the Observer which it was created for.
-
#timers ⇒ Object
readonly
The Array of registered ZMQ::Timers.
Instance Method Summary collapse
-
#add_observer(observer) ⇒ Object
Add the specified
observer
and subscribe to the events it wishes to receive. -
#add_timers_for(observer) ⇒ Object
Register a timer for the specified
observer
. -
#handle_signal(sig) ⇒ Object
Handle signals.
-
#handle_system_event(event_type, event) ⇒ Object
Handle a ‘sys.` event from the Manager being observed.
-
#initialize ⇒ ObserverRunner
constructor
Create a new Arborist::ObserverRunner.
-
#load_observers(enumerator) ⇒ Object
Load observers from the specified
enumerator
. -
#on_hangup_signal(signo) ⇒ Object
Handle a HUP signal.
-
#on_subscription_event(event) ⇒ Object
Handle IO events from the reactor.
-
#on_termination_signal(signo) ⇒ Object
(also: #on_interrupt_signal)
Handle a TERM signal.
-
#register_observer_timers ⇒ Object
Register timers for each Observer.
-
#register_observers ⇒ Object
Add subscriptions for all of the observers loaded into the runner.
-
#remove_observer(observer) ⇒ Object
Remove the specified
observer
after unsubscribing from its events. -
#remove_timers ⇒ Object
Remove any registered timers.
-
#reset ⇒ Object
Unsubscribe from and clear all current subscriptions.
-
#restart ⇒ Object
Restart the observer, resetting all of its observers’ subscriptions.
-
#run ⇒ Object
Run the specified
observers
. -
#running? ⇒ Boolean
Returns true if the ObserverRunner is running.
-
#stop ⇒ Object
Stop the observer.
-
#subscribe_to_system_events ⇒ Object
Subscribe the runner to system events published by the Manager.
-
#unregister_observers ⇒ Object
Remove the subscriptions belonging to the loaded observers.
Constructor Details
#initialize ⇒ ObserverRunner
Create a new Arborist::ObserverRunner
31 32 33 34 35 36 37 38 |
# File 'lib/arborist/observer_runner.rb', line 31 def initialize @observers = [] @timers = [] @subscriptions = {} @reactor = CZTop::Reactor.new @client = Arborist::Client.new @manager_last_runid = nil end |
Instance Attribute Details
#client ⇒ Object (readonly)
The Arborist::Client that will be used for creating and tearing down subscriptions
55 56 57 |
# File 'lib/arborist/observer_runner.rb', line 55 def client @client end |
#observers ⇒ Object (readonly)
The Array of loaded Arborist::Observers the runner should run.
46 47 48 |
# File 'lib/arborist/observer_runner.rb', line 46 def observers @observers end |
#reactor ⇒ Object
The reactor (a CZTop::Reactor) the runner uses to drive everything
52 53 54 |
# File 'lib/arborist/observer_runner.rb', line 52 def reactor @reactor end |
#subscriptions ⇒ Object (readonly)
The map of subscription IDs to the Observer which it was created for.
58 59 60 |
# File 'lib/arborist/observer_runner.rb', line 58 def subscriptions @subscriptions end |
#timers ⇒ Object (readonly)
The Array of registered ZMQ::Timers
49 50 51 |
# File 'lib/arborist/observer_runner.rb', line 49 def timers @timers end |
Instance Method Details
#add_observer(observer) ⇒ Object
Add the specified observer
and subscribe to the events it wishes to receive.
170 171 172 173 174 175 176 177 178 179 |
# File 'lib/arborist/observer_runner.rb', line 170 def add_observer( observer ) self.log.info "Adding observer: %s" % [ observer.description ] observer.subscriptions.each do |sub| self.log.debug "HI: %p" % [ sub ] subid = self.client.subscribe( **sub ) self.subscriptions[ subid ] = observer self.client.event_api.subscribe( subid ) self.log.debug " subscribed to %p with subscription %s" % [ sub, subid ] end end |
#add_timers_for(observer) ⇒ Object
Register a timer for the specified observer
.
141 142 143 144 145 146 147 148 |
# File 'lib/arborist/observer_runner.rb', line 141 def add_timers_for( observer ) observer.timers.each do |interval, callback| self.log.info "Creating timer for %s observer to run %p every %ds" % [ observer.description, callback, interval ] timer = self.reactor.add_periodic_timer( interval, &callback ) self.timers << timer end end |
#handle_signal(sig) ⇒ Object
Handle signals.
247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/arborist/observer_runner.rb', line 247 def handle_signal( sig ) self.log.debug "Handling signal %s" % [ sig ] case sig when :INT, :TERM self.on_termination_signal( sig ) when :HUP self.on_hangup_signal( sig ) else self.log.warn "Unhandled signal %s" % [ sig ] end end |
#handle_system_event(event_type, event) ⇒ Object
Handle a ‘sys.` event from the Manager being observed.
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/arborist/observer_runner.rb', line 219 def handle_system_event( event_type, event ) self.log.debug "Got a %s event from the Manager: %p" % [ event_type, event ] case event_type when 'sys.heartbeat' this_runid = event['run_id'] if @manager_last_runid && this_runid != @manager_last_runid self.log.warn "Manager run ID changed: re-subscribing" self.reset self.register_observers end @manager_last_runid = this_runid when 'sys.node_added', 'sys.node_removed' # no-op else # no-op end end |
#load_observers(enumerator) ⇒ Object
Load observers from the specified enumerator
.
62 63 64 |
# File 'lib/arborist/observer_runner.rb', line 62 def load_observers( enumerator ) self.observers.concat( enumerator.to_a ) end |
#on_hangup_signal(signo) ⇒ Object
Handle a HUP signal. The default is to restart the handler.
273 274 275 276 |
# File 'lib/arborist/observer_runner.rb', line 273 def on_hangup_signal( signo ) self.log.warn "Hangup (%p)" % [ signo ] self.restart end |
#on_subscription_event(event) ⇒ Object
Handle IO events from the reactor.
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/arborist/observer_runner.rb', line 198 def on_subscription_event( event ) if event.readable? msg = event.socket.receive subid, event = Arborist::EventAPI.decode( msg ) if (( observer = self.subscriptions[subid] )) self.log.debug "Got %p event for %p" % [ subid, observer ] observer.handle_event( subid, event ) elsif subid.start_with?( 'sys.' ) self.log.debug "System event! %p" % [ event ] self.handle_system_event( subid, event ) else self.log.warn "Ignoring event %p for which we have no observer." % [ subid ] end else raise "Unhandled event %p on the event socket" % [ event ] end end |
#on_termination_signal(signo) ⇒ Object Also known as: on_interrupt_signal
Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to #on_interrupt_signal.
265 266 267 268 |
# File 'lib/arborist/observer_runner.rb', line 265 def on_termination_signal( signo ) self.log.warn "Terminated (%p)" % [ signo ] self.stop end |
#register_observer_timers ⇒ Object
Register timers for each Observer.
119 120 121 122 123 |
# File 'lib/arborist/observer_runner.rb', line 119 def register_observer_timers self.observers.each do |observer| self.add_timers_for( observer ) end end |
#register_observers ⇒ Object
Add subscriptions for all of the observers loaded into the runner.
111 112 113 114 115 |
# File 'lib/arborist/observer_runner.rb', line 111 def register_observers self.observers.each do |observer| self.add_observer( observer ) end end |
#remove_observer(observer) ⇒ Object
Remove the specified observer
after unsubscribing from its events.
183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/arborist/observer_runner.rb', line 183 def remove_observer( observer ) self.log.info "Removing observer: %s" % [ observer.description ] self.subscriptions.keys.each do |subid| next unless self.subscriptions[ subid ] == observer self.client.unsubscribe( subid ) self.subscriptions.delete( subid ) self.client.event_api.unsubscribe( subid ) self.log.debug " unsubscribed from %p" % [ subid ] end end |
#remove_timers ⇒ Object
Remove any registered timers.
152 153 154 155 156 |
# File 'lib/arborist/observer_runner.rb', line 152 def remove_timers self.timers.each do |timer| self.reactor.remove_timer( timer ) end end |
#reset ⇒ Object
Unsubscribe from and clear all current subscriptions.
160 161 162 163 164 165 166 |
# File 'lib/arborist/observer_runner.rb', line 160 def reset self.log.warn "Resetting observer subscriptions." self.subscriptions.keys.each do |subid| self.client.event_api.unsubscribe( subid ) end self.subscriptions.clear end |
#restart ⇒ Object
Restart the observer, resetting all of its observers’ subscriptions.
92 93 94 95 96 97 98 99 |
# File 'lib/arborist/observer_runner.rb', line 92 def restart self.log.info "Restarting!" self.reactor.timers.pause self.unregister_observers self.register_observers self.reactor.timers.resume end |
#run ⇒ Object
Run the specified observers
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/arborist/observer_runner.rb', line 68 def run self.log.info "Starting!" self.register_observers self.register_observer_timers self.subscribe_to_system_events self.reactor.register( self.client.event_api, :read, &self.method(:on_subscription_event) ) self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do self.reactor.start_polling( ignore_interrupts: true ) end end |
#running? ⇒ Boolean
Returns true if the ObserverRunner is running.
103 104 105 106 107 |
# File 'lib/arborist/observer_runner.rb', line 103 def running? return self.reactor && self.client && self.reactor.registered?( self.client.event_api ) end |
#stop ⇒ Object
Stop the observer
83 84 85 86 87 88 |
# File 'lib/arborist/observer_runner.rb', line 83 def stop self.log.info "Stopping!" self.remove_timers self.unregister_observers self.reactor.stop_polling end |
#subscribe_to_system_events ⇒ Object
Subscribe the runner to system events published by the Manager.
135 136 137 |
# File 'lib/arborist/observer_runner.rb', line 135 def subscribe_to_system_events self.client.event_api.subscribe( 'sys.' ) end |
#unregister_observers ⇒ Object
Remove the subscriptions belonging to the loaded observers.
127 128 129 130 131 |
# File 'lib/arborist/observer_runner.rb', line 127 def unregister_observers self.observers.each do |observer| self.remove_observer( observer ) end end |