Class: Arborist::Manager
- Inherits:
-
Object
- Object
- Arborist::Manager
- Extended by:
- MethodUtilities, Configurability, Loggability
- Includes:
- HashUtilities, CZTop::Reactor::SignalHandling
- Defined in:
- lib/arborist/manager.rb
Overview
The main Arborist process – responsible for coordinating all other activity.
Constant Summary collapse
- QUEUE_SIGS =
Signals the manager responds to
[ :INT, :TERM, :HUP, :USR1, # :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU ] & Signal.list.keys.map( &:to_sym )
- VALID_TREEAPI_ACTIONS =
Array of actions supported by the Tree API
%w[ ack deps fetch graft modify prune search status subscribe unack unsubscribe update ]
Instance Attribute Summary collapse
-
#checkpoint_timer ⇒ Object
readonly
The Timers::Timer that periodically checkpoints the manager’s state (if it’s configured to do so).
-
#event_queue ⇒ Object
readonly
The queue of pending Event API events.
-
#event_socket ⇒ Object
The ZeroMQ PUB socket that publishes events for the Event API.
-
#heartbeat_timer ⇒ Object
readonly
The Timers::Timer that periodically publishes a heartbeat event.
-
#linger ⇒ Object
readonly
The maximum amount of time to wait for pending events to be delivered during shutdown, in milliseconds.
-
#nodes ⇒ Object
The Hash of all loaded Nodes, keyed by their identifier.
-
#reactor ⇒ Object
readonly
The CZTop::Reactor that runs the event loop.
-
#root ⇒ Object
The root node of the tree.
-
#run_id ⇒ Object
readonly
A unique string used to identify different runs of the Manager.
-
#start_time ⇒ Object
The time at which the manager began running.
-
#subscriptions ⇒ Object
The Hash of all Subscriptions, keyed by their subscription ID.
-
#tree_socket ⇒ Object
The ZeroMQ socket REP socket that handles Tree API requests.
Instance Method Summary collapse
-
#add_node(node) ⇒ Object
Add the specified
nodeto the Manager. -
#all_nodes(&block) ⇒ Object
Yield each node in a depth-first traversal of the manager’s tree to the specified
block, or return an Enumerator if no block is given. -
#ancestors_for(node) ⇒ Object
Return the Array of all nodes above the specified
node. -
#build_tree ⇒ Object
Build the tree out of all the loaded nodes.
-
#cancel_checkpoint_timer ⇒ Object
Cancel the timer that saves tree snapshots.
-
#cancel_heartbeat_timer ⇒ Object
Cancel the timer that publishes heartbeat events.
-
#cancel_timers ⇒ Object
Register the Manager’s timers.
-
#create_subscription(identifier, event_pattern, criteria, negative_criteria = {}) ⇒ Object
Create a subscription that publishes to the Manager’s event publisher for the node with the specified
identifierandevent_pattern, using the givencriteriawhen considering an event. -
#depth_limited_enumerator_for(start_node, depth, &filter) ⇒ Object
Return a
depthlimited enumerator for the specifiedstart_node. -
#descendants_for(node) ⇒ Object
Return an Array of all nodes below the specified
node. -
#dispatch_request(raw_request) ⇒ Object
Handle the specified
raw_requestand return a response. -
#enumerator_for(start_node, &filter) ⇒ Object
Return an enumerator for the specified
start_node. -
#find_matching_node_states(filter, return_values, exclude_down = false, negative_filter = {}) ⇒ Object
Traverse the node tree and return the specified
return_valuesfrom any nodes which match the givenfilter, skipping downed nodes and all their children ifexclude_downis set. -
#handle_ack_request(header, body) ⇒ Object
Acknowledge a node.
-
#handle_deps_request(header, body) ⇒ Object
Return a response to the ‘deps` action.
-
#handle_fetch_request(header, body) ⇒ Object
Return a repsonse to the ‘fetch` action.
-
#handle_graft_request(header, body) ⇒ Object
Add a node.
-
#handle_modify_request(header, body) ⇒ Object
Modify a node’s operational attributes.
-
#handle_prune_request(header, body) ⇒ Object
Remove a node and its children.
-
#handle_search_request(header, body) ⇒ Object
Return a response to the ‘search’ action.
-
#handle_signal(sig) ⇒ Object
Handle signals.
-
#handle_status_request(header, body) ⇒ Object
Return a response to the ‘status` action.
-
#handle_subscribe_request(header, body) ⇒ Object
Return a response to the ‘subscribe` action.
-
#handle_unack_request(header, body) ⇒ Object
Un-acknowledge a node.
-
#handle_unsubscribe_request(header, body) ⇒ Object
Return a response to the ‘unsubscribe` action.
-
#handle_update_request(header, body) ⇒ Object
Update nodes using the data from the update request’s
body. -
#initialize ⇒ Manager
constructor
Create a new Arborist::Manager.
-
#inspect ⇒ Object
Return a human-readable representation of the Manager suitable for debugging.
-
#link_node(node) ⇒ Object
Link the node to other nodes in the tree.
-
#link_node_to_parent(node) ⇒ Object
Link the specified
nodeto its parent. -
#load_tree(enumerator) ⇒ Object
Add nodes yielded from the specified
enumeratorinto the manager’s tree. -
#lookup_tree_request_action(header) ⇒ Object
Given a request
header, return a #call-able object that can handle the response. -
#merge_dependencies_from(from, deps_set = Set.new) ⇒ Object
Recurse into the children and secondary dependencies of the
fromnode and merge the identifiers of the traversed nodes into thedeps_set. -
#nodecount ⇒ Object
Return the number of nodes in the manager’s tree.
-
#nodelist ⇒ Object
Return an Array of the identifiers of all nodes in the manager’s tree.
-
#on_event_socket_event(event) ⇒ Object
IO event handler for the event socket.
-
#on_hangup_signal(signo) ⇒ Object
Handle a HUP signal.
-
#on_termination_signal(signo) ⇒ Object
(also: #on_interrupt_signal)
Handle a TERM signal.
-
#on_tree_socket_event(event) ⇒ Object
ZMQ::Handler API – Read and handle an incoming request.
-
#on_user1_signal(signo) ⇒ Object
Handle a USR1 signal.
-
#propagate_events(node, *events) ⇒ Object
Propagate one or more
eventsto the specifiednodeand its ancestors in the tree, publishing them to matching subscriptions belonging to the nodes along the way. -
#publish(identifier, event) ⇒ Object
Publish the specified
event. -
#publish_heartbeat_event ⇒ Object
Publish a system event that observers can watch for to detect restarts.
-
#publish_system_event(eventname, **data) ⇒ Object
Publish an event with the specified
eventnameanddata. -
#reachable_nodes(&block) ⇒ Object
Yield each node that is not down to the specified
block, or return an Enumerator if no block is given. -
#register_checkpoint_timer ⇒ Object
Register a periodic timer that will save a snapshot of the node tree’s state to the state file on a configured interval if one is configured.
-
#register_event_socket ⇒ Object
Register the publisher with the reactor if it’s not already.
-
#register_heartbeat_timer ⇒ Object
Register a periodic timer that will publish a heartbeat event at a configurable interval.
-
#register_timers ⇒ Object
Register the Manager’s timers.
-
#remove_node(node) ⇒ Object
Remove a
nodefrom the Manager. -
#remove_subscription(subscription_identifier) ⇒ Object
Remove the subscription with the specified
subscription_identifierfrom the node it’s attached to and from the manager, and return it. -
#restart ⇒ Object
Restart the manager.
-
#restore_node_states ⇒ Object
Attempt to restore the state of loaded node from the configured state file.
-
#resume_checkpoint_timer ⇒ Object
Resume the timer that saves tree snapshots.
-
#resume_heartbeat_timer ⇒ Object
Resume the timer that publishes heartbeat events.
-
#root_node ⇒ Object
Return the current root node.
-
#run ⇒ Object
Setup sockets and start the event loop.
-
#running? ⇒ Boolean
Returns true if the Manager is running.
-
#save_node_states ⇒ Object
Write out the state of all the manager’s nodes to the state_file if one is configured.
-
#setup_event_socket ⇒ Object
Set up the ZMQ PUB socket for published events.
-
#setup_sockets ⇒ Object
Create the sockets used by the manager and bind them to the appropriate endpoints.
-
#setup_tree_socket ⇒ Object
Set up the ZeroMQ REP socket for the Tree API.
-
#shutdown_event_socket ⇒ Object
Stop accepting events to be published.
-
#shutdown_sockets ⇒ Object
Shut down the sockets used by the manager.
-
#shutdown_tree_socket ⇒ Object
Tear down the ZeroMQ REP socket.
-
#start_accepting_requests ⇒ Object
Start a loop, accepting a request and handling it.
-
#stop ⇒ Object
Stop the manager.
-
#subscribe(identifier, subscription) ⇒ Object
Add the specified
subscriptionto the node corresponding with the givenidentifier. -
#tree_built ⇒ Object
Flag for marking when the tree is built successfully the first time.
-
#unregister_event_socket ⇒ Object
Unregister the event publisher socket from the reactor if it’s registered.
-
#update_node(identifier, new_properties, monitor_key = '_') ⇒ Object
Update the node with the specified
identifierwith the givennew_propertiesand propagate any events generated by the update to the node and its ancestors. -
#uptime ⇒ Object
Return the duration the manager has been running in seconds.
Methods included from MethodUtilities
attr_predicate, attr_predicate_accessor, dsl_accessor, singleton_attr_accessor, singleton_attr_reader, singleton_attr_writer, singleton_method_alias, singleton_predicate_accessor, singleton_predicate_reader
Methods included from HashUtilities
compact_hash, hash_matches, merge_recursively, stringify_keys, symbolify_keys
Constructor Details
#initialize ⇒ Manager
Create a new Arborist::Manager.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/arborist/manager.rb', line 98 def initialize @run_id = SecureRandom.hex( 16 ) @root = Arborist::Node.create( :root ) @nodes = { '_' => @root } @subscriptions = {} @tree_built = false @start_time = nil @checkpoint_timer = nil @linger = self.class.linger self.log.info "Linger set to %p" % [ @linger ] @reactor = CZTop::Reactor.new @tree_socket = nil @event_socket = nil @event_queue = [] @heartbeat_timer = nil @checkpoint_timer = nil end |
Instance Attribute Details
#checkpoint_timer ⇒ Object (readonly)
The Timers::Timer that periodically checkpoints the manager’s state (if it’s configured to do so)
174 175 176 |
# File 'lib/arborist/manager.rb', line 174 def checkpoint_timer @checkpoint_timer end |
#event_queue ⇒ Object (readonly)
The queue of pending Event API events
160 161 162 |
# File 'lib/arborist/manager.rb', line 160 def event_queue @event_queue end |
#event_socket ⇒ Object
The ZeroMQ PUB socket that publishes events for the Event API
156 157 158 |
# File 'lib/arborist/manager.rb', line 156 def event_socket @event_socket end |
#heartbeat_timer ⇒ Object (readonly)
The Timers::Timer that periodically publishes a heartbeat event
178 179 180 |
# File 'lib/arborist/manager.rb', line 178 def heartbeat_timer @heartbeat_timer end |
#linger ⇒ Object (readonly)
The maximum amount of time to wait for pending events to be delivered during shutdown, in milliseconds.
169 170 171 |
# File 'lib/arborist/manager.rb', line 169 def linger @linger end |
#nodes ⇒ Object
The Hash of all loaded Nodes, keyed by their identifier
136 137 138 |
# File 'lib/arborist/manager.rb', line 136 def nodes @nodes end |
#reactor ⇒ Object (readonly)
The CZTop::Reactor that runs the event loop
148 149 150 |
# File 'lib/arborist/manager.rb', line 148 def reactor @reactor end |
#root ⇒ Object
The root node of the tree.
132 133 134 |
# File 'lib/arborist/manager.rb', line 132 def root @root end |
#run_id ⇒ Object (readonly)
A unique string used to identify different runs of the Manager
128 129 130 |
# File 'lib/arborist/manager.rb', line 128 def run_id @run_id end |
#start_time ⇒ Object
The time at which the manager began running.
144 145 146 |
# File 'lib/arborist/manager.rb', line 144 def start_time @start_time end |
#subscriptions ⇒ Object
The Hash of all Subscriptions, keyed by their subscription ID
140 141 142 |
# File 'lib/arborist/manager.rb', line 140 def subscriptions @subscriptions end |
#tree_socket ⇒ Object
The ZeroMQ socket REP socket that handles Tree API requests
152 153 154 |
# File 'lib/arborist/manager.rb', line 152 def tree_socket @tree_socket end |
Instance Method Details
#add_node(node) ⇒ Object
Add the specified node to the Manager.
482 483 484 485 486 487 488 489 490 491 492 |
# File 'lib/arborist/manager.rb', line 482 def add_node( node ) identifier = node.identifier raise Arborist::NodeError, "Node %p already present." % [ identifier ] if self.nodes[ identifier ] self.nodes[ identifier ] = node if self.tree_built? self.link_node( node ) self.publish_system_event( 'node_added', node: identifier ) end end |
#all_nodes(&block) ⇒ Object
Yield each node in a depth-first traversal of the manager’s tree to the specified block, or return an Enumerator if no block is given.
915 916 917 918 919 |
# File 'lib/arborist/manager.rb', line 915 def all_nodes( &block ) iter = self.enumerator_for( self.root ) return iter.each( &block ) if block return iter end |
#ancestors_for(node) ⇒ Object
Return the Array of all nodes above the specified node.
971 972 973 974 975 |
# File 'lib/arborist/manager.rb', line 971 def ancestors_for( node ) parent_id = node.parent or return [] parent = self.nodes[ parent_id ] return [ parent ] + self.ancestors_for( parent ) end |
#build_tree ⇒ Object
Build the tree out of all the loaded nodes.
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 |
# File 'lib/arborist/manager.rb', line 449 def build_tree self.log.info "Building tree from %d loaded nodes." % [ self.nodes.length ] # Build primary tree structure self.nodes.each_value do |node| next if node.operational? self.link_node_to_parent( node ) end self.tree_built = true # Set up secondary dependencies self.nodes.each_value do |node| node.register_secondary_dependencies( self ) end self.restore_node_states end |
#cancel_checkpoint_timer ⇒ Object
Cancel the timer that saves tree snapshots.
373 374 375 |
# File 'lib/arborist/manager.rb', line 373 def cancel_checkpoint_timer self.reactor.remove_timer( self.checkpoint_timer ) end |
#cancel_heartbeat_timer ⇒ Object
Cancel the timer that publishes heartbeat events.
341 342 343 |
# File 'lib/arborist/manager.rb', line 341 def cancel_heartbeat_timer self.reactor.remove_timer( self.heartbeat_timer ) end |
#cancel_timers ⇒ Object
Register the Manager’s timers.
230 231 232 233 |
# File 'lib/arborist/manager.rb', line 230 def cancel_timers self.cancel_heartbeat_timer self.cancel_checkpoint_timer end |
#create_subscription(identifier, event_pattern, criteria, negative_criteria = {}) ⇒ Object
Create a subscription that publishes to the Manager’s event publisher for the node with the specified identifier and event_pattern, using the given criteria when considering an event.
1084 1085 1086 1087 1088 1089 1090 1091 |
# File 'lib/arborist/manager.rb', line 1084 def create_subscription( identifier, event_pattern, criteria, negative_criteria={} ) sub = Arborist::Subscription.new( event_pattern, criteria, negative_criteria ) do |*args| self.publish( *args ) end self.subscribe( identifier, sub ) return sub end |
#depth_limited_enumerator_for(start_node, depth, &filter) ⇒ Object
Return a depth limited enumerator for the specified start_node.
946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 |
# File 'lib/arborist/manager.rb', line 946 def depth_limited_enumerator_for( start_node, depth, &filter ) return Enumerator.new do |yielder| traverse = ->( node, current_depth ) do self.log.debug "Enumerating nodes from %s at depth: %p" % [ node.identifier, current_depth ] if !filter || filter.call( node ) yielder.yield( node ) node.each do |child| traverse[ child, current_depth - 1 ] end if current_depth > 0 end end traverse.call( start_node, depth ) end end |
#descendants_for(node) ⇒ Object
Return an Array of all nodes below the specified node.
965 966 967 |
# File 'lib/arborist/manager.rb', line 965 def descendants_for( node ) return self.enumerator_for( node ).to_a end |
#dispatch_request(raw_request) ⇒ Object
Handle the specified raw_request and return a response.
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 |
# File 'lib/arborist/manager.rb', line 615 def dispatch_request( raw_request ) raise "Manager is shutting down" unless self.running? header, body = Arborist::TreeAPI.decode( raw_request ) handler = self.lookup_tree_request_action( header ) return handler.call( header, body ) rescue => err self.log.error "%p: %s" % [ err.class, err. ] err.backtrace.each {|frame| self.log.debug " #{frame}" } errtype = case err when Arborist::MessageError, Arborist::ConfigError, Arborist::NodeError 'client' else 'server' end return Arborist::TreeAPI.error_response( errtype, err. ) end |
#enumerator_for(start_node, &filter) ⇒ Object
Return an enumerator for the specified start_node.
932 933 934 935 936 937 938 939 940 941 942 |
# File 'lib/arborist/manager.rb', line 932 def enumerator_for( start_node, &filter ) return Enumerator.new do |yielder| traverse = ->( node ) do if !filter || filter.call( node ) yielder.yield( node ) node.each( &traverse ) end end traverse.call( start_node ) end end |
#find_matching_node_states(filter, return_values, exclude_down = false, negative_filter = {}) ⇒ Object
Traverse the node tree and return the specified return_values from any nodes which match the given filter, skipping downed nodes and all their children if exclude_down is set. If return_values is set to nil, then all values from the node will be returned.
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 |
# File 'lib/arborist/manager.rb', line 543 def find_matching_node_states( filter, return_values, exclude_down=false, negative_filter={} ) nodes_iter = if exclude_down self.reachable_nodes else self.all_nodes end states = nodes_iter. select {|node| node.matches?(filter) }. reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }. each_with_object( {} ) do |node, hash| hash[ node.identifier ] = node.fetch_values( return_values ) end return states end |
#handle_ack_request(header, body) ⇒ Object
Acknowledge a node
871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 |
# File 'lib/arborist/manager.rb', line 871 def handle_ack_request( header, body ) self.log.info "ACK: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for ACK.' ) node = self.nodes[ identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) self.log.debug "Acking the %s node: %p" % [ identifier, body ] body = symbolify_keys( body ) events = node.acknowledge( **body ) self.propagate_events( node, events ) return Arborist::TreeAPI.successful_response( nil ) end |
#handle_deps_request(header, body) ⇒ Object
Return a response to the ‘deps` action.
729 730 731 732 733 734 735 736 737 738 739 740 |
# File 'lib/arborist/manager.rb', line 729 def handle_deps_request( header, body ) self.log.info "DEPS: %p" % [ header ] from = header['from'] || '_' deps = self.merge_dependencies_from( from ) deps.delete( from ) return Arborist::TreeAPI.successful_response({ deps: deps.to_a }) rescue Arborist::ClientError => err return Arborist::TreeAPI.error_response( 'client', err. ) end |
#handle_fetch_request(header, body) ⇒ Object
Return a repsonse to the ‘fetch` action.
702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 |
# File 'lib/arborist/manager.rb', line 702 def handle_fetch_request( header, body ) self.log.info "FETCH: %p" % [ header ] from = header['from'] || '_' depth = header['depth'] tree = header['tree'] start_node = self.nodes[ from ] or return Arborist::TreeAPI.error_response( 'client', "No such node %s." % [from] ) self.log.debug " Listing nodes under %p" % [ start_node ] if tree iter = [ start_node.to_h(depth: (depth || -1)) ] elsif depth self.log.debug " depth limited to %d" % [ depth ] iter = self.depth_limited_enumerator_for( start_node, depth ) else self.log.debug " no depth limit" iter = self.enumerator_for( start_node ) end data = iter.map( &:to_h ) self.log.debug " got data for %d nodes" % [ data.length ] return Arborist::TreeAPI.successful_response( data ) end |
#handle_graft_request(header, body) ⇒ Object
Add a node
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 |
# File 'lib/arborist/manager.rb', line 812 def handle_graft_request( header, body ) self.log.info "GRAFT: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for GRAFT.' ) if self.nodes[ identifier ] return Arborist::TreeAPI.error_response( 'client', "Node %p already exists." % [identifier] ) end type = header[ 'type' ] or return Arborist::TreeAPI.error_response( 'client', 'No type specified for GRAFT.' ) parent = header[ 'parent' ] || '_' parent_node = self.nodes[ parent ] or return Arborist::TreeAPI.error_response( 'client', 'No parent node found for %s.' % [parent] ) self.log.debug "Grafting a new %s node under %p" % [ type, parent_node ] # If the parent has a factory method for the node type, use it, otherwise # use the Pluggability factory node = if parent_node.respond_to?( type ) parent_node.method( type ).call( identifier, body ) else body.merge!( parent: parent ) Arborist::Node.create( type, identifier, body ) end self.add_node( node ) return Arborist::TreeAPI.successful_response( node ? {identifier: node.identifier} : nil ) end |
#handle_modify_request(header, body) ⇒ Object
Modify a node’s operational attributes
846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 |
# File 'lib/arborist/manager.rb', line 846 def handle_modify_request( header, body ) self.log.info "MODIFY: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for MODIFY.' ) return Arborist::TreeAPI.error_response( 'client', "Unable to MODIFY root node." ) if identifier == '_' node = self.nodes[ identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) self.log.debug "Modifying operational attributes of the %s node: %p" % [ identifier, body ] if new_parent_identifier = body.delete( 'parent' ) old_parent = self.nodes[ node.parent ] new_parent = self.nodes[ new_parent_identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such parent node: %p" % [new_parent_identifier] ) node.reparent( old_parent, new_parent ) end node.modify( body ) return Arborist::TreeAPI.successful_response( nil ) end |
#handle_prune_request(header, body) ⇒ Object
Remove a node and its children.
800 801 802 803 804 805 806 807 808 |
# File 'lib/arborist/manager.rb', line 800 def handle_prune_request( header, body ) self.log.info "PRUNE: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for PRUNE.' ) node = self.remove_node( identifier ) return Arborist::TreeAPI.successful_response( node ? node.to_h : nil ) end |
#handle_search_request(header, body) ⇒ Object
Return a response to the ‘search’ action.
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 |
# File 'lib/arborist/manager.rb', line 763 def handle_search_request( header, body ) self.log.info "SEARCH: %p" % [ header ] exclude_down = header['exclude_down'] values = if header.key?( 'return' ) header['return'] || [] else nil end body = [ body ] unless body.is_a?( Array ) positive = body.shift negative = body.shift || {} states = self.find_matching_node_states( positive, values, exclude_down, negative ) return Arborist::TreeAPI.successful_response( states ) end |
#handle_signal(sig) ⇒ Object
Handle signals.
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/arborist/manager.rb', line 391 def handle_signal( sig ) self.log.debug "Handling signal %s" % [ sig ] case sig when :INT, :TERM self.on_termination_signal( sig ) when :HUP self.on_hangup_signal( sig ) when :USR1 self.on_user1_signal( sig ) else self.log.warn "Unhandled signal %s" % [ sig ] end end |
#handle_status_request(header, body) ⇒ Object
Return a response to the ‘status` action.
656 657 658 659 660 661 662 663 664 |
# File 'lib/arborist/manager.rb', line 656 def handle_status_request( header, body ) self.log.info "STATUS: %p" % [ header ] return Arborist::TreeAPI.successful_response( server_version: Arborist::VERSION, state: self.running? ? 'running' : 'not running', uptime: self.uptime, nodecount: self.nodecount ) end |
#handle_subscribe_request(header, body) ⇒ Object
Return a response to the ‘subscribe` action.
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
# File 'lib/arborist/manager.rb', line 668 def handle_subscribe_request( header, body ) self.log.info "SUBSCRIBE: %p" % [ header ] event_type = header[ 'event_type' ] node_identifier = header[ 'identifier' ] body = [ body ] unless body.is_a?( Array ) positive = body.shift negative = body.shift || {} subscription = self.create_subscription( node_identifier, event_type, positive, negative ) self.log.info "Subscription to %s events at or under %s: %p" % [ event_type || 'all', node_identifier || 'the root node', subscription ] return Arborist::TreeAPI.successful_response( id: subscription.id ) end |
#handle_unack_request(header, body) ⇒ Object
Un-acknowledge a node
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 |
# File 'lib/arborist/manager.rb', line 890 def handle_unack_request( header, body ) self.log.info "UNACK: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNACK.' ) node = self.nodes[ identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) self.log.debug "Unacking the %s node: %p" % [ identifier, body ] events = node.unacknowledge self.propagate_events( node, events ) return Arborist::TreeAPI.successful_response( nil ) end |
#handle_unsubscribe_request(header, body) ⇒ Object
Return a response to the ‘unsubscribe` action.
686 687 688 689 690 691 692 693 694 695 696 697 698 |
# File 'lib/arborist/manager.rb', line 686 def handle_unsubscribe_request( header, body ) self.log.info "UNSUBSCRIBE: %p" % [ header ] subscription_id = header[ 'subscription_id' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNSUBSCRIBE.' ) subscription = self.remove_subscription( subscription_id ) or return Arborist::TreeAPI.successful_response( nil ) self.log.info "Destroyed subscription: %p" % [ subscription ] return Arborist::TreeAPI.successful_response( event_type: subscription.event_type, criteria: subscription.criteria ) end |
#handle_update_request(header, body) ⇒ Object
Update nodes using the data from the update request’s body.
783 784 785 786 787 788 789 790 791 792 793 794 795 796 |
# File 'lib/arborist/manager.rb', line 783 def handle_update_request( header, body ) self.log.info "UPDATE: %p" % [ header ] unless body.respond_to?( :each ) return Arborist::TreeAPI.error_response( 'client', 'Malformed update: body does not respond to #each' ) end monitor_key = header['monitor_key'] body.each do |identifier, properties| self.update_node( identifier, properties, monitor_key ) end return Arborist::TreeAPI.successful_response( nil ) end |
#inspect ⇒ Object
Return a human-readable representation of the Manager suitable for debugging.
264 265 266 267 268 269 270 271 |
# File 'lib/arborist/manager.rb', line 264 def inspect return "#<%p:%#x {runid: %s} %d nodes>" % [ self.class, self.object_id * 2, self.run_id, self.nodes.length, ] end |
#link_node(node) ⇒ Object
Link the node to other nodes in the tree.
496 497 498 499 500 501 |
# File 'lib/arborist/manager.rb', line 496 def link_node( node ) raise "Tree is not built yet" unless self.tree_built? self.link_node_to_parent( node ) node.register_secondary_dependencies( self ) end |
#link_node_to_parent(node) ⇒ Object
Link the specified node to its parent. Raises an error if the specified node‘s parent is not yet loaded.
470 471 472 473 474 475 476 477 478 |
# File 'lib/arborist/manager.rb', line 470 def link_node_to_parent( node ) self.log.debug "Linking node %p to its parent" % [ node ] parent_id = node.parent || '_' parent_node = self.nodes[ parent_id ] or raise "no parent '%s' node loaded for %p" % [ parent_id, node ] self.log.debug "adding %p as a child of %p" % [ node, parent_node ] parent_node.add_child( node ) end |
#load_tree(enumerator) ⇒ Object
Add nodes yielded from the specified enumerator into the manager’s tree.
440 441 442 443 444 445 |
# File 'lib/arborist/manager.rb', line 440 def load_tree( enumerator ) enumerator.each do |node| self.add_node( node ) end self.build_tree end |
#lookup_tree_request_action(header) ⇒ Object
Given a request header, return a #call-able object that can handle the response.
641 642 643 644 645 646 647 648 649 650 651 652 |
# File 'lib/arborist/manager.rb', line 641 def lookup_tree_request_action( header ) raise Arborist::MessageError, "unsupported version %d" % [ header['version'] ] unless header['version'] == 1 action = header['action'] or raise Arborist::MessageError, "missing required header 'action'" raise Arborist::MessageError, "No such action '%s'" % [ action ] unless VALID_TREEAPI_ACTIONS.include?( action ) handler_name = "handle_%s_request" % [ action ] return self.method( handler_name ) end |
#merge_dependencies_from(from, deps_set = Set.new) ⇒ Object
Recurse into the children and secondary dependencies of the from node and merge the identifiers of the traversed nodes into the deps_set.
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 |
# File 'lib/arborist/manager.rb', line 745 def merge_dependencies_from( from, deps_set=Set.new ) return deps_set unless deps_set.add?( from ) start_node = self.nodes[ from ] or raise Arborist::ClientError "No such node %s." % [ from ] self.enumerator_for( start_node ).each do |subnode| deps_set.add( subnode.identifier ) subnode.node_subscribers.each do |subdep| self.merge_dependencies_from( subdep, deps_set ) end end return deps_set end |
#nodecount ⇒ Object
Return the number of nodes in the manager’s tree.
569 570 571 |
# File 'lib/arborist/manager.rb', line 569 def nodecount return self.nodes.length end |
#nodelist ⇒ Object
Return an Array of the identifiers of all nodes in the manager’s tree.
575 576 577 |
# File 'lib/arborist/manager.rb', line 575 def nodelist return self.nodes.keys end |
#on_event_socket_event(event) ⇒ Object
IO event handler for the event socket.
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 |
# File 'lib/arborist/manager.rb', line 1034 def on_event_socket_event( event ) if event.writable? if (( msg = self.event_queue.shift )) # self.log.debug "Publishing event %p" % [ msg ] event.socket << msg end else raise "Unhandled event %p on the event socket" % [ event ] end self.unregister_event_socket if self.event_queue.empty? end |
#on_hangup_signal(signo) ⇒ Object
Handle a HUP signal. The default is to restart the handler.
420 421 422 423 |
# File 'lib/arborist/manager.rb', line 420 def on_hangup_signal( signo ) self.log.warn "Hangup (%p)" % [ signo ] self.restart end |
#on_termination_signal(signo) ⇒ Object Also known as: on_interrupt_signal
Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to #on_interrupt_signal.
412 413 414 415 |
# File 'lib/arborist/manager.rb', line 412 def on_termination_signal( signo ) self.log.warn "Terminated (%p)" % [ signo ] self.stop end |
#on_tree_socket_event(event) ⇒ Object
ZMQ::Handler API – Read and handle an incoming request.
603 604 605 606 607 608 609 610 611 |
# File 'lib/arborist/manager.rb', line 603 def on_tree_socket_event( event ) if event.readable? request = event.socket.receive msg = self.dispatch_request( request ) event.socket << msg else raise "Unsupported event %p on tree API socket!" % [ event ] end end |
#on_user1_signal(signo) ⇒ Object
Handle a USR1 signal. Writes a message to the log.
427 428 429 430 |
# File 'lib/arborist/manager.rb', line 427 def on_user1_signal( signo ) self.log.info "Checkpoint: User signal." self.save_node_states end |
#propagate_events(node, *events) ⇒ Object
Propagate one or more events to the specified node and its ancestors in the tree, publishing them to matching subscriptions belonging to the nodes along the way.
1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 |
# File 'lib/arborist/manager.rb', line 1104 def propagate_events( node, *events ) node.publish_events( *events ) if node.parent self.log.debug "Propagating %d events from %s -> %s" % [ events.length, node.identifier, node.parent ] parent = self.nodes[ node.parent ] or raise "couldn't find parent %p of node %p!" % [ node.parent, node.identifier ] self.propagate_events( parent, *events ) end end |
#publish(identifier, event) ⇒ Object
Publish the specified event.
1011 1012 1013 1014 |
# File 'lib/arborist/manager.rb', line 1011 def publish( identifier, event ) self.event_queue << Arborist::EventAPI.encode( identifier, event.to_h ) self.register_event_socket if self.running? end |
#publish_heartbeat_event ⇒ Object
Publish a system event that observers can watch for to detect restarts.
1049 1050 1051 1052 1053 1054 1055 1056 |
# File 'lib/arborist/manager.rb', line 1049 def publish_heartbeat_event return unless self.start_time self.publish_system_event( 'heartbeat', run_id: self.run_id, start_time: self.start_time.iso8601, version: Arborist::VERSION ) end |
#publish_system_event(eventname, **data) ⇒ Object
Publish an event with the specified eventname and data.
1060 1061 1062 1063 1064 1065 |
# File 'lib/arborist/manager.rb', line 1060 def publish_system_event( eventname, **data ) eventname = eventname.to_s eventname = 'sys.' + eventname unless eventname.start_with?( 'sys.' ) self.log.debug "Publishing %s event: %p." % [ eventname, data ] self.publish( eventname, data ) end |
#reachable_nodes(&block) ⇒ Object
Yield each node that is not down to the specified block, or return an Enumerator if no block is given.
924 925 926 927 928 |
# File 'lib/arborist/manager.rb', line 924 def reachable_nodes( &block ) iter = self.enumerator_for( self.root ) {|node| node.reachable? } return iter.each( &block ) if block return iter end |
#register_checkpoint_timer ⇒ Object
Register a periodic timer that will save a snapshot of the node tree’s state to the state file on a configured interval if one is configured.
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/arborist/manager.rb', line 354 def register_checkpoint_timer unless self.class.state_file self.log.info "No state file configured; skipping checkpoint timer setup." return nil end interval = self.class.checkpoint_frequency unless interval && interval.nonzero? self.log.info "Checkpoint frequency is %p; skipping checkpoint timer setup." % [ interval ] return nil end self.log.info "Setting up node state checkpoint every %0.3fs" % [ interval ] @checkpoint_timer = self.reactor.add_periodic_timer( interval ) do self.save_node_states end end |
#register_event_socket ⇒ Object
Register the publisher with the reactor if it’s not already.
1018 1019 1020 1021 1022 |
# File 'lib/arborist/manager.rb', line 1018 def register_event_socket self.log.debug "Registering event socket for write events." self.reactor.enable_events( self.event_socket, :write ) unless self.reactor.event_enabled?( self.event_socket, :write ) end |
#register_heartbeat_timer ⇒ Object
Register a periodic timer that will publish a heartbeat event at a configurable interval.
330 331 332 333 334 335 336 337 |
# File 'lib/arborist/manager.rb', line 330 def register_heartbeat_timer interval = self.class.heartbeat_frequency self.log.info "Setting up to heartbeat every %ds" % [ interval ] @heartbeat_timer = self.reactor.add_periodic_timer( interval ) do self.publish_heartbeat_event end end |
#register_timers ⇒ Object
Register the Manager’s timers.
223 224 225 226 |
# File 'lib/arborist/manager.rb', line 223 def register_timers self.register_checkpoint_timer self.register_heartbeat_timer end |
#remove_node(node) ⇒ Object
Remove a node from the Manager. The node can either be the Arborist::Node to remove, or the identifier of a node.
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
# File 'lib/arborist/manager.rb', line 506 def remove_node( node ) node = self.nodes[ node ] unless node.is_a?( Arborist::Node ) return unless node raise "Can't remove an operational node" if node.operational? self.log.info "Removing node %p" % [ node ] self.publish_system_event( 'node_removed', node: node.identifier ) node.children.each do |identifier, child_node| self.remove_node( child_node ) end if parent_node = self.nodes[ node.parent || '_' ] parent_node.remove_child( node ) end return self.nodes.delete( node.identifier ) end |
#remove_subscription(subscription_identifier) ⇒ Object
Remove the subscription with the specified subscription_identifier from the node it’s attached to and from the manager, and return it.
1096 1097 1098 1099 |
# File 'lib/arborist/manager.rb', line 1096 def remove_subscription( subscription_identifier ) node = self.subscriptions.delete( subscription_identifier ) or return nil return node.remove_subscription( subscription_identifier ) end |
#restart ⇒ Object
Restart the manager
251 252 253 |
# File 'lib/arborist/manager.rb', line 251 def restart raise NotImplementedError end |
#restore_node_states ⇒ Object
Attempt to restore the state of loaded node from the configured state file. Returns true if it succeeded, or false if a state file wasn’t configured, doesn’t exist, isn’t readable, or couldn’t be unmarshalled.
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/arborist/manager.rb', line 306 def restore_node_states path = self.class.state_file or return false return false unless path.readable? self.log.info "Restoring node state from %s" % [ path ] nodes = Marshal.load( path.open('r:binary') ) nodes.each do |identifier, saved_node| self.log.debug "Loaded node: %p" % [ identifier ] if (( current_node = self.nodes[ identifier ] )) self.log.debug "Restoring state of the %p node." % [ identifier ] current_node.restore( saved_node ) else self.log.info "Not restoring state for the %s node: not present in the loaded tree." % [ identifier ] end end return true end |
#resume_checkpoint_timer ⇒ Object
Resume the timer that saves tree snapshots.
379 380 381 |
# File 'lib/arborist/manager.rb', line 379 def resume_checkpoint_timer self.reactor.resume_timer( self.checkpoint_timer ) end |
#resume_heartbeat_timer ⇒ Object
Resume the timer that publishes heartbeat events.
347 348 349 |
# File 'lib/arborist/manager.rb', line 347 def resume_heartbeat_timer self.reactor.resume_timer( self.heartbeat_timer ) end |
#root_node ⇒ Object
Return the current root node.
908 909 910 |
# File 'lib/arborist/manager.rb', line 908 def root_node return self.nodes[ '_' ] end |
#run ⇒ Object
Setup sockets and start the event loop.
186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/arborist/manager.rb', line 186 def run self.log.info "Getting ready to start the manager." self.setup_sockets self.register_timers self.with_signal_handler( reactor, *QUEUE_SIGS ) do self.start_accepting_requests end ensure self.shutdown_sockets self.save_node_states end |
#running? ⇒ Boolean
Returns true if the Manager is running.
215 216 217 218 219 |
# File 'lib/arborist/manager.rb', line 215 def running? return self.reactor && self.event_socket && self.reactor.registered?( self.event_socket ) end |
#save_node_states ⇒ Object
Write out the state of all the manager’s nodes to the state_file if one is configured.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/arborist/manager.rb', line 280 def save_node_states start_time = Time.now path = self.class.state_file or return self.log.info "Saving current node state to %s" % [ path ] tmpfile = Tempfile.create( [path.basename.to_s.sub(path.extname, ''), path.extname], path.dirname.to_s, encoding: 'binary' ) Marshal.dump( self.nodes, tmpfile ) tmpfile.close File.rename( tmpfile.path, path.to_s ) self.log.debug "Saved state file in %0.1f seconds." % [ Time.now - start_time ] rescue SystemCallError => err self.log.error "%p while saving node state: %s" % [ err.class, err. ] ensure File.unlink( tmpfile.path ) if tmpfile && File.exist?( tmpfile.path ) end |
#setup_event_socket ⇒ Object
Set up the ZMQ PUB socket for published events.
983 984 985 986 987 988 989 |
# File 'lib/arborist/manager.rb', line 983 def setup_event_socket @event_socket = CZTop::Socket::PUB.new self.log.info " binding the event socket (%#0x) to %p" % [ @event_socket.object_id * 2, Arborist.event_api_url ] @event_socket..linger = ( self.linger * 1000 ).ceil @event_socket.bind( Arborist.event_api_url ) end |
#setup_sockets ⇒ Object
Create the sockets used by the manager and bind them to the appropriate endpoints.
201 202 203 204 |
# File 'lib/arborist/manager.rb', line 201 def setup_sockets self.setup_tree_socket self.setup_event_socket end |
#setup_tree_socket ⇒ Object
Set up the ZeroMQ REP socket for the Tree API.
586 587 588 589 590 591 592 |
# File 'lib/arborist/manager.rb', line 586 def setup_tree_socket @tree_socket = CZTop::Socket::REP.new self.log.info " binding the tree API socket (%#0x) to %p" % [ @tree_socket.object_id * 2, Arborist.tree_api_url ] @tree_socket..linger = 0 @tree_socket.bind( Arborist.tree_api_url ) end |
#shutdown_event_socket ⇒ Object
Stop accepting events to be published
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 |
# File 'lib/arborist/manager.rb', line 993 def shutdown_event_socket start = Time.now timeout = start + (self.linger.to_f / 2.0) self.log.warn "Waiting to empty the event queue..." until self.event_queue.empty? sleep 0.1 break if Time.now > timeout end self.log.warn " ... waited %0.1f seconds" % [ Time.now - start ] @event_socket..linger = 0 @event_socket.unbind( @event_socket.last_endpoint ) @event_socket = nil end |
#shutdown_sockets ⇒ Object
Shut down the sockets used by the manager.
208 209 210 211 |
# File 'lib/arborist/manager.rb', line 208 def shutdown_sockets self.shutdown_tree_socket self.shutdown_event_socket end |
#shutdown_tree_socket ⇒ Object
Tear down the ZeroMQ REP socket.
596 597 598 599 |
# File 'lib/arborist/manager.rb', line 596 def shutdown_tree_socket @tree_socket.unbind( @tree_socket.last_endpoint ) @tree_socket = nil end |
#start_accepting_requests ⇒ Object
Start a loop, accepting a request and handling it.
237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/arborist/manager.rb', line 237 def start_accepting_requests self.log.debug "Starting the main loop" self.start_time = Time.now self.reactor.register( self.tree_socket, :read, &self.method(:on_tree_socket_event) ) self.reactor.register( self.event_socket, :write, &self.method(:on_event_socket_event) ) self.log.debug "Manager running." return self.reactor.start_polling( ignore_interrupts: true ) end |
#stop ⇒ Object
Stop the manager.
257 258 259 260 |
# File 'lib/arborist/manager.rb', line 257 def stop self.log.info "Stopping the manager." self.reactor.stop_polling end |
#subscribe(identifier, subscription) ⇒ Object
Add the specified subscription to the node corresponding with the given identifier.
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 |
# File 'lib/arborist/manager.rb', line 1069 def subscribe( identifier, subscription ) identifier ||= '_' node = self.nodes[ identifier ] or raise ArgumentError, "no such node %p" % [ identifier ] self.log.debug "Registering subscription %p" % [ subscription ] node.add_subscription( subscription ) self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ] self.subscriptions[ subscription.id ] = node self.log.debug " subscriptions hash: %#0x" % [ self.subscriptions.object_id ] end |
#tree_built ⇒ Object
Flag for marking when the tree is built successfully the first time
164 |
# File 'lib/arborist/manager.rb', line 164 attr_predicate_accessor :tree_built |
#unregister_event_socket ⇒ Object
Unregister the event publisher socket from the reactor if it’s registered.
1026 1027 1028 1029 1030 |
# File 'lib/arborist/manager.rb', line 1026 def unregister_event_socket self.log.debug "Unregistering event socket for write events." self.reactor.disable_events( self.event_socket, :write ) if self.reactor.event_enabled?( self.event_socket, :write ) end |
#update_node(identifier, new_properties, monitor_key = '_') ⇒ Object
Update the node with the specified identifier with the given new_properties and propagate any events generated by the update to the node and its ancestors.
528 529 530 531 532 533 534 535 536 |
# File 'lib/arborist/manager.rb', line 528 def update_node( identifier, new_properties, monitor_key='_' ) unless (( node = self.nodes[identifier] )) self.log.warn "Update for non-existent node %p ignored." % [ identifier ] return [] end events = node.update( new_properties, monitor_key ) self.propagate_events( node, events ) end |
#uptime ⇒ Object
Return the duration the manager has been running in seconds.
562 563 564 565 |
# File 'lib/arborist/manager.rb', line 562 def uptime return 0 unless self.start_time return Time.now - self.start_time end |