Class: Arborist::ObserverRunner

Inherits:
Object
  • Object
show all
Extended by:
Loggability
Includes:
CZTop::Reactor::SignalHandling
Defined in:
lib/arborist/observer_runner.rb

Overview

An event-driven runner for Arborist::Observers.

Constant Summary collapse

QUEUE_SIGS =

Signals the observer runner responds to

[
	:INT, :TERM, :HUP,
	# :TODO: :QUIT, :WINCH, :USR1, :USR2, :TTIN, :TTOU
] & Signal.list.keys.map( &:to_sym )

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeObserverRunner

Create a new Arborist::ObserverRunner



31
32
33
34
35
36
37
38
# File 'lib/arborist/observer_runner.rb', line 31

def initialize
	@observers          = []
	@timers             = []
	@subscriptions      = {}
	@reactor            = CZTop::Reactor.new
	@client             = Arborist::Client.new
	@manager_last_runid = nil
end

Instance Attribute Details

#clientObject (readonly)

The Arborist::Client that will be used for creating and tearing down subscriptions



55
56
57
# File 'lib/arborist/observer_runner.rb', line 55

def client
  @client
end

#observersObject (readonly)

The Array of loaded Arborist::Observers the runner should run.



46
47
48
# File 'lib/arborist/observer_runner.rb', line 46

def observers
  @observers
end

#reactorObject

The reactor (a CZTop::Reactor) the runner uses to drive everything



52
53
54
# File 'lib/arborist/observer_runner.rb', line 52

def reactor
  @reactor
end

#subscriptionsObject (readonly)

The map of subscription IDs to the Observer which it was created for.



58
59
60
# File 'lib/arborist/observer_runner.rb', line 58

def subscriptions
  @subscriptions
end

#timersObject (readonly)

The Array of registered ZMQ::Timers



49
50
51
# File 'lib/arborist/observer_runner.rb', line 49

def timers
  @timers
end

Instance Method Details

#add_observer(observer) ⇒ Object

Add the specified observer and subscribe to the events it wishes to receive.



170
171
172
173
174
175
176
177
178
179
# File 'lib/arborist/observer_runner.rb', line 170

def add_observer( observer )
	self.log.info "Adding observer: %s" % [ observer.description ]
	observer.subscriptions.each do |sub|
		self.log.debug "HI: %p" % [ sub ]
		subid = self.client.subscribe( **sub )
		self.subscriptions[ subid ] = observer
		self.client.event_api.subscribe( subid )
		self.log.debug "  subscribed to %p with subscription %s" % [ sub, subid ]
	end
end

#add_timers_for(observer) ⇒ Object

Register a timer for the specified observer.



141
142
143
144
145
146
147
148
# File 'lib/arborist/observer_runner.rb', line 141

def add_timers_for( observer )
	observer.timers.each do |interval, callback|
		self.log.info "Creating timer for %s observer to run %p every %ds" %
			[ observer.description, callback, interval ]
		timer = self.reactor.add_periodic_timer( interval, &callback )
		self.timers << timer
	end
end

#handle_signal(sig) ⇒ Object

Handle signals.



247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/arborist/observer_runner.rb', line 247

def handle_signal( sig )
	self.log.debug "Handling signal %s" % [ sig ]
	case sig
	when :INT, :TERM
		self.on_termination_signal( sig )

	when :HUP
		self.on_hangup_signal( sig )

	else
		self.log.warn "Unhandled signal %s" % [ sig ]
	end

end

#handle_system_event(event_type, event) ⇒ Object

Handle a ‘sys.` event from the Manager being observed.



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/arborist/observer_runner.rb', line 219

def handle_system_event( event_type, event )
	self.log.debug "Got a %s event from the Manager: %p" % [ event_type, event ]

	case event_type
	when 'sys.heartbeat'
		this_runid = event['run_id']
		if @manager_last_runid && this_runid != @manager_last_runid
			self.log.warn "Manager run ID changed: re-subscribing"
			self.reset
			self.register_observers
		end

		@manager_last_runid = this_runid
	when 'sys.node_added', 'sys.node_removed'
		# no-op
	else
		# no-op
	end
end

#load_observers(enumerator) ⇒ Object

Load observers from the specified enumerator.



62
63
64
# File 'lib/arborist/observer_runner.rb', line 62

def load_observers( enumerator )
	self.observers.concat( enumerator.to_a )
end

#on_hangup_signal(signo) ⇒ Object

Handle a HUP signal. The default is to restart the handler.



273
274
275
276
# File 'lib/arborist/observer_runner.rb', line 273

def on_hangup_signal( signo )
	self.log.warn "Hangup (%p)" % [ signo ]
	self.restart
end

#on_subscription_event(event) ⇒ Object

Handle IO events from the reactor.



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/arborist/observer_runner.rb', line 198

def on_subscription_event( event )
	if event.readable?
		msg = event.socket.receive
		subid, event = Arborist::EventAPI.decode( msg )

		if (( observer = self.subscriptions[subid] ))
			self.log.debug "Got %p event for %p" % [ subid, observer ]
			observer.handle_event( subid, event )
		elsif subid.start_with?( 'sys.' )
			self.log.debug "System event! %p" % [ event ]
			self.handle_system_event( subid, event )
		else
			self.log.warn "Ignoring event %p for which we have no observer." % [ subid ]
		end
	else
		raise "Unhandled event %p on the event socket" % [ event ]
	end
end

#on_termination_signal(signo) ⇒ Object Also known as: on_interrupt_signal

Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to #on_interrupt_signal.



265
266
267
268
# File 'lib/arborist/observer_runner.rb', line 265

def on_termination_signal( signo )
	self.log.warn "Terminated (%p)" % [ signo ]
	self.stop
end

#register_observer_timersObject

Register timers for each Observer.



119
120
121
122
123
# File 'lib/arborist/observer_runner.rb', line 119

def register_observer_timers
	self.observers.each do |observer|
		self.add_timers_for( observer )
	end
end

#register_observersObject

Add subscriptions for all of the observers loaded into the runner.



111
112
113
114
115
# File 'lib/arborist/observer_runner.rb', line 111

def register_observers
	self.observers.each do |observer|
		self.add_observer( observer )
	end
end

#remove_observer(observer) ⇒ Object

Remove the specified observer after unsubscribing from its events.



183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/arborist/observer_runner.rb', line 183

def remove_observer( observer )
	self.log.info "Removing observer: %s" % [ observer.description ]

	self.subscriptions.keys.each do |subid|
		next unless self.subscriptions[ subid ] == observer

		self.client.unsubscribe( subid )
		self.subscriptions.delete( subid )
		self.client.event_api.unsubscribe( subid )
		self.log.debug "  unsubscribed from %p" % [ subid ]
	end
end

#remove_timersObject

Remove any registered timers.



152
153
154
155
156
# File 'lib/arborist/observer_runner.rb', line 152

def remove_timers
	self.timers.each do |timer|
		self.reactor.remove_timer( timer )
	end
end

#resetObject

Unsubscribe from and clear all current subscriptions.



160
161
162
163
164
165
166
# File 'lib/arborist/observer_runner.rb', line 160

def reset
	self.log.warn "Resetting observer subscriptions."
	self.subscriptions.keys.each do |subid|
		self.client.event_api.unsubscribe( subid )
	end
	self.subscriptions.clear
end

#restartObject

Restart the observer, resetting all of its observers’ subscriptions.



92
93
94
95
96
97
98
99
# File 'lib/arborist/observer_runner.rb', line 92

def restart
	self.log.info "Restarting!"
	self.reactor.timers.pause
	self.unregister_observers

	self.register_observers
	self.reactor.timers.resume
end

#runObject

Run the specified observers



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/arborist/observer_runner.rb', line 68

def run
	self.log.info "Starting!"
	self.register_observers
	self.register_observer_timers
	self.subscribe_to_system_events

	self.reactor.register( self.client.event_api, :read, &self.method(:on_subscription_event) )

	self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do
		self.reactor.start_polling( ignore_interrupts: true )
	end
end

#running?Boolean

Returns true if the ObserverRunner is running.

Returns:

  • (Boolean)


103
104
105
106
107
# File 'lib/arborist/observer_runner.rb', line 103

def running?
	return self.reactor &&
		self.client &&
		self.reactor.registered?( self.client.event_api )
end

#stopObject

Stop the observer



83
84
85
86
87
88
# File 'lib/arborist/observer_runner.rb', line 83

def stop
	self.log.info "Stopping!"
	self.remove_timers
	self.unregister_observers
	self.reactor.stop_polling
end

#subscribe_to_system_eventsObject

Subscribe the runner to system events published by the Manager.



135
136
137
# File 'lib/arborist/observer_runner.rb', line 135

def subscribe_to_system_events
	self.client.event_api.subscribe( 'sys.' )
end

#unregister_observersObject

Remove the subscriptions belonging to the loaded observers.



127
128
129
130
131
# File 'lib/arborist/observer_runner.rb', line 127

def unregister_observers
	self.observers.each do |observer|
		self.remove_observer( observer )
	end
end