Class: Arborist::Manager

Inherits:
Object
  • Object
show all
Extended by:
MethodUtilities, Configurability, Loggability
Includes:
HashUtilities, CZTop::Reactor::SignalHandling
Defined in:
lib/arborist/manager.rb

Overview

The main Arborist process – responsible for coordinating all other activity.

Constant Summary collapse

QUEUE_SIGS =

Signals the manager responds to

[
  :INT, :TERM, :HUP, :USR1,
  # :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU
] & Signal.list.keys.map( &:to_sym )
VALID_TREEAPI_ACTIONS =

Array of actions supported by the Tree API

%w[
  ack
  deps
  fetch
  graft
  modify
  prune
  search
  status
  subscribe
  unack
  unsubscribe
  update
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MethodUtilities

attr_predicate, attr_predicate_accessor, dsl_accessor, singleton_attr_accessor, singleton_attr_reader, singleton_attr_writer, singleton_method_alias, singleton_predicate_accessor, singleton_predicate_reader

Methods included from HashUtilities

compact_hash, hash_matches, merge_recursively, stringify_keys, symbolify_keys

Constructor Details

#initializeManager

Create a new Arborist::Manager.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/arborist/manager.rb', line 98

def initialize
  @run_id = SecureRandom.hex( 16 )
  @root = Arborist::Node.create( :root )
  @nodes = { '_' => @root }

  @subscriptions = {}
  @tree_built = false

  @start_time   = nil

  @checkpoint_timer = nil
  @linger = self.class.linger
  self.log.info "Linger set to %p" % [ @linger ]

  @reactor = CZTop::Reactor.new
  @tree_socket = nil
  @event_socket = nil
  @event_queue = []

  @heartbeat_timer = nil
  @checkpoint_timer = nil
end

Instance Attribute Details

#checkpoint_timerObject (readonly)

The Timers::Timer that periodically checkpoints the manager’s state (if it’s configured to do so)



174
175
176
# File 'lib/arborist/manager.rb', line 174

def checkpoint_timer
  @checkpoint_timer
end

#event_queueObject (readonly)

The queue of pending Event API events



160
161
162
# File 'lib/arborist/manager.rb', line 160

def event_queue
  @event_queue
end

#event_socketObject

The ZeroMQ PUB socket that publishes events for the Event API



156
157
158
# File 'lib/arborist/manager.rb', line 156

def event_socket
  @event_socket
end

#heartbeat_timerObject (readonly)

The Timers::Timer that periodically publishes a heartbeat event



178
179
180
# File 'lib/arborist/manager.rb', line 178

def heartbeat_timer
  @heartbeat_timer
end

#lingerObject (readonly)

The maximum amount of time to wait for pending events to be delivered during shutdown, in milliseconds.



169
170
171
# File 'lib/arborist/manager.rb', line 169

def linger
  @linger
end

#nodesObject

The Hash of all loaded Nodes, keyed by their identifier



136
137
138
# File 'lib/arborist/manager.rb', line 136

def nodes
  @nodes
end

#reactorObject (readonly)

The CZTop::Reactor that runs the event loop



148
149
150
# File 'lib/arborist/manager.rb', line 148

def reactor
  @reactor
end

#rootObject

The root node of the tree.



132
133
134
# File 'lib/arborist/manager.rb', line 132

def root
  @root
end

#run_idObject (readonly)

A unique string used to identify different runs of the Manager



128
129
130
# File 'lib/arborist/manager.rb', line 128

def run_id
  @run_id
end

#start_timeObject

The time at which the manager began running.



144
145
146
# File 'lib/arborist/manager.rb', line 144

def start_time
  @start_time
end

#subscriptionsObject

The Hash of all Subscriptions, keyed by their subscription ID



140
141
142
# File 'lib/arborist/manager.rb', line 140

def subscriptions
  @subscriptions
end

#tree_socketObject

The ZeroMQ socket REP socket that handles Tree API requests



152
153
154
# File 'lib/arborist/manager.rb', line 152

def tree_socket
  @tree_socket
end

Instance Method Details

#add_node(node) ⇒ Object

Add the specified node to the Manager.



482
483
484
485
486
487
488
489
490
491
492
# File 'lib/arborist/manager.rb', line 482

def add_node( node )
  identifier = node.identifier

  raise Arborist::NodeError, "Node %p already present." % [ identifier ] if self.nodes[ identifier ]
  self.nodes[ identifier ] = node

  if self.tree_built?
    self.link_node( node )
    self.publish_system_event( 'node_added', node: identifier )
  end
end

#all_nodes(&block) ⇒ Object

Yield each node in a depth-first traversal of the manager’s tree to the specified block, or return an Enumerator if no block is given.



915
916
917
918
919
# File 'lib/arborist/manager.rb', line 915

def all_nodes( &block )
  iter = self.enumerator_for( self.root )
  return iter.each( &block ) if block
  return iter
end

#ancestors_for(node) ⇒ Object

Return the Array of all nodes above the specified node.



971
972
973
974
975
# File 'lib/arborist/manager.rb', line 971

def ancestors_for( node )
  parent_id = node.parent or return []
  parent = self.nodes[ parent_id ]
  return [ parent ] + self.ancestors_for( parent )
end

#build_treeObject

Build the tree out of all the loaded nodes.



449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'lib/arborist/manager.rb', line 449

def build_tree
  self.log.info "Building tree from %d loaded nodes." % [ self.nodes.length ]

  # Build primary tree structure
  self.nodes.each_value do |node|
    next if node.operational?
    self.link_node_to_parent( node )
  end
  self.tree_built = true

  # Set up secondary dependencies
  self.nodes.each_value do |node|
    node.register_secondary_dependencies( self )
  end

  self.restore_node_states
end

#cancel_checkpoint_timerObject

Cancel the timer that saves tree snapshots.



373
374
375
# File 'lib/arborist/manager.rb', line 373

def cancel_checkpoint_timer
  self.reactor.remove_timer( self.checkpoint_timer )
end

#cancel_heartbeat_timerObject

Cancel the timer that publishes heartbeat events.



341
342
343
# File 'lib/arborist/manager.rb', line 341

def cancel_heartbeat_timer
  self.reactor.remove_timer( self.heartbeat_timer )
end

#cancel_timersObject

Register the Manager’s timers.



230
231
232
233
# File 'lib/arborist/manager.rb', line 230

def cancel_timers
  self.cancel_heartbeat_timer
  self.cancel_checkpoint_timer
end

#create_subscription(identifier, event_pattern, criteria, negative_criteria = {}) ⇒ Object

Create a subscription that publishes to the Manager’s event publisher for the node with the specified identifier and event_pattern, using the given criteria when considering an event.



1084
1085
1086
1087
1088
1089
1090
1091
# File 'lib/arborist/manager.rb', line 1084

def create_subscription( identifier, event_pattern, criteria, negative_criteria={} )
  sub = Arborist::Subscription.new( event_pattern, criteria, negative_criteria ) do |*args|
    self.publish( *args )
  end
  self.subscribe( identifier, sub )

  return sub
end

#depth_limited_enumerator_for(start_node, depth, &filter) ⇒ Object

Return a depth limited enumerator for the specified start_node.



946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
# File 'lib/arborist/manager.rb', line 946

def depth_limited_enumerator_for( start_node, depth, &filter )
  return Enumerator.new do |yielder|
    traverse = ->( node, current_depth ) do
      self.log.debug "Enumerating nodes from %s at depth: %p" %
        [ node.identifier, current_depth ]

      if !filter || filter.call( node )
        yielder.yield( node )
        node.each do |child|
          traverse[ child, current_depth - 1 ]
        end if current_depth > 0
      end
    end
    traverse.call( start_node, depth )
  end
end

#descendants_for(node) ⇒ Object

Return an Array of all nodes below the specified node.



965
966
967
# File 'lib/arborist/manager.rb', line 965

def descendants_for( node )
  return self.enumerator_for( node ).to_a
end

#dispatch_request(raw_request) ⇒ Object

Handle the specified raw_request and return a response.



615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
# File 'lib/arborist/manager.rb', line 615

def dispatch_request( raw_request )
  raise "Manager is shutting down" unless self.running?

  header, body = Arborist::TreeAPI.decode( raw_request )
  handler = self.lookup_tree_request_action( header )

  return handler.call( header, body )

rescue => err
  self.log.error "%p: %s" % [ err.class, err.message ]
  err.backtrace.each {|frame| self.log.debug "  #{frame}" }

  errtype = case err
    when Arborist::MessageError,
         Arborist::ConfigError,
         Arborist::NodeError
      'client'
    else
      'server'
    end

  return Arborist::TreeAPI.error_response( errtype, err.message )
end

#enumerator_for(start_node, &filter) ⇒ Object

Return an enumerator for the specified start_node.



932
933
934
935
936
937
938
939
940
941
942
# File 'lib/arborist/manager.rb', line 932

def enumerator_for( start_node, &filter )
  return Enumerator.new do |yielder|
    traverse = ->( node ) do
      if !filter || filter.call( node )
        yielder.yield( node )
        node.each( &traverse )
      end
    end
    traverse.call( start_node )
  end
end

#find_matching_node_states(filter, return_values, exclude_down = false, negative_filter = {}) ⇒ Object

Traverse the node tree and return the specified return_values from any nodes which match the given filter, skipping downed nodes and all their children if exclude_down is set. If return_values is set to nil, then all values from the node will be returned.



543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
# File 'lib/arborist/manager.rb', line 543

def find_matching_node_states( filter, return_values, exclude_down=false, negative_filter={} )
  nodes_iter = if exclude_down
      self.reachable_nodes
    else
      self.all_nodes
    end

  states = nodes_iter.
    select {|node| node.matches?(filter) }.
    reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }.
    each_with_object( {} ) do |node, hash|
      hash[ node.identifier ] = node.fetch_values( return_values )
    end

  return states
end

#handle_ack_request(header, body) ⇒ Object

Acknowledge a node



871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
# File 'lib/arborist/manager.rb', line 871

def handle_ack_request( header, body )
  self.log.info "ACK: %p" % [ header ]

  identifier = header[ 'identifier' ] or
    return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for ACK.' )
  node = self.nodes[ identifier ] or
    return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )

  self.log.debug "Acking the %s node: %p" % [ identifier, body ]

  body = symbolify_keys( body )
  events = node.acknowledge( **body )
  self.propagate_events( node, events )

  return Arborist::TreeAPI.successful_response( nil )
end

#handle_deps_request(header, body) ⇒ Object

Return a response to the ‘deps` action.



729
730
731
732
733
734
735
736
737
738
739
740
# File 'lib/arborist/manager.rb', line 729

def handle_deps_request( header, body )
  self.log.info "DEPS: %p" % [ header ]
  from = header['from'] || '_'

  deps = self.merge_dependencies_from( from )
  deps.delete( from )

  return Arborist::TreeAPI.successful_response({ deps: deps.to_a })

rescue Arborist::ClientError => err
  return Arborist::TreeAPI.error_response( 'client', err.message )
end

#handle_fetch_request(header, body) ⇒ Object

Return a repsonse to the ‘fetch` action.



702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
# File 'lib/arborist/manager.rb', line 702

def handle_fetch_request( header, body )
  self.log.info "FETCH: %p" % [ header ]
  from  = header['from'] || '_'
  depth = header['depth']
  tree  = header['tree']

  start_node = self.nodes[ from ] or
    return Arborist::TreeAPI.error_response( 'client', "No such node %s." % [from] )
  self.log.debug "  Listing nodes under %p" % [ start_node ]

  if tree
    iter = [ start_node.to_h(depth: (depth || -1)) ]
  elsif depth
    self.log.debug "    depth limited to %d" % [ depth ]
    iter = self.depth_limited_enumerator_for( start_node, depth )
  else
    self.log.debug "    no depth limit"
    iter = self.enumerator_for( start_node )
  end
  data = iter.map( &:to_h )
  self.log.debug "  got data for %d nodes" % [ data.length ]

  return Arborist::TreeAPI.successful_response( data )
end

#handle_graft_request(header, body) ⇒ Object

Add a node



812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
# File 'lib/arborist/manager.rb', line 812

def handle_graft_request( header, body )
  self.log.info "GRAFT: %p" % [ header ]

  identifier = header[ 'identifier' ] or
    return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for GRAFT.' )

  if self.nodes[ identifier ]
    return Arborist::TreeAPI.error_response( 'client', "Node %p already exists." % [identifier] )
  end

  type = header[ 'type' ] or
    return Arborist::TreeAPI.error_response( 'client', 'No type specified for GRAFT.' )
  parent = header[ 'parent' ] || '_'
  parent_node = self.nodes[ parent ] or
    return Arborist::TreeAPI.error_response( 'client', 'No parent node found for %s.' % [parent] )

  self.log.debug "Grafting a new %s node under %p" % [ type, parent_node ]

  # If the parent has a factory method for the node type, use it, otherwise
  # use the Pluggability factory
  node = if parent_node.respond_to?( type )
      parent_node.method( type ).call( identifier, body )
    else
      body.merge!( parent: parent )
      Arborist::Node.create( type, identifier, body )
    end

  self.add_node( node )

  return Arborist::TreeAPI.successful_response( node ? {identifier: node.identifier} : nil )
end

#handle_modify_request(header, body) ⇒ Object

Modify a node’s operational attributes



846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
# File 'lib/arborist/manager.rb', line 846

def handle_modify_request( header, body )
  self.log.info "MODIFY: %p" % [ header ]

  identifier = header[ 'identifier' ] or
    return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for MODIFY.' )
  return Arborist::TreeAPI.error_response( 'client', "Unable to MODIFY root node." ) if identifier == '_'
  node = self.nodes[ identifier ] or
    return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )

  self.log.debug "Modifying operational attributes of the %s node: %p" % [ identifier, body ]

  if new_parent_identifier = body.delete( 'parent' )
    old_parent = self.nodes[ node.parent ]
    new_parent = self.nodes[ new_parent_identifier ] or
      return Arborist::TreeAPI.error_response( 'client', "No such parent node: %p" % [new_parent_identifier] )
    node.reparent( old_parent, new_parent )
  end

  node.modify( body )

  return Arborist::TreeAPI.successful_response( nil )
end

#handle_prune_request(header, body) ⇒ Object

Remove a node and its children.



800
801
802
803
804
805
806
807
808
# File 'lib/arborist/manager.rb', line 800

def handle_prune_request( header, body )
  self.log.info "PRUNE: %p" % [ header ]

  identifier = header[ 'identifier' ] or
    return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for PRUNE.' )
  node = self.remove_node( identifier )

  return Arborist::TreeAPI.successful_response( node ? node.to_h : nil )
end

#handle_search_request(header, body) ⇒ Object

Return a response to the ‘search’ action.



763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
# File 'lib/arborist/manager.rb', line 763

def handle_search_request( header, body )
  self.log.info "SEARCH: %p" % [ header ]

  exclude_down = header['exclude_down']
  values = if header.key?( 'return' )
      header['return'] || []
    else
      nil
    end

  body = [ body ] unless body.is_a?( Array )
  positive = body.shift
  negative = body.shift || {}
  states = self.find_matching_node_states( positive, values, exclude_down, negative )

  return Arborist::TreeAPI.successful_response( states )
end

#handle_signal(sig) ⇒ Object

Handle signals.



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/arborist/manager.rb', line 391

def handle_signal( sig )
  self.log.debug "Handling signal %s" % [ sig ]
  case sig
  when :INT, :TERM
    self.on_termination_signal( sig )

  when :HUP
    self.on_hangup_signal( sig )

  when :USR1
    self.on_user1_signal( sig )

  else
    self.log.warn "Unhandled signal %s" % [ sig ]
  end

end

#handle_status_request(header, body) ⇒ Object

Return a response to the ‘status` action.



656
657
658
659
660
661
662
663
664
# File 'lib/arborist/manager.rb', line 656

def handle_status_request( header, body )
  self.log.info "STATUS: %p" % [ header ]
  return Arborist::TreeAPI.successful_response(
    server_version: Arborist::VERSION,
    state: self.running? ? 'running' : 'not running',
    uptime: self.uptime,
    nodecount: self.nodecount
  )
end

#handle_subscribe_request(header, body) ⇒ Object

Return a response to the ‘subscribe` action.



668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
# File 'lib/arborist/manager.rb', line 668

def handle_subscribe_request( header, body )
  self.log.info "SUBSCRIBE: %p" % [ header ]
  event_type      = header[ 'event_type' ]
  node_identifier = header[ 'identifier' ]

  body = [ body ] unless body.is_a?( Array )
  positive = body.shift
  negative = body.shift || {}

  subscription = self.create_subscription( node_identifier, event_type, positive, negative )
  self.log.info "Subscription to %s events at or under %s: %p" %
    [ event_type || 'all', node_identifier || 'the root node', subscription ]

  return Arborist::TreeAPI.successful_response( id: subscription.id )
end

#handle_unack_request(header, body) ⇒ Object

Un-acknowledge a node



890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
# File 'lib/arborist/manager.rb', line 890

def handle_unack_request( header, body )
  self.log.info "UNACK: %p" % [ header ]

  identifier = header[ 'identifier' ] or
    return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNACK.' )
  node = self.nodes[ identifier ] or
    return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )

  self.log.debug "Unacking the %s node: %p" % [ identifier, body ]

  events = node.unacknowledge
  self.propagate_events( node, events )

  return Arborist::TreeAPI.successful_response( nil )
end

#handle_unsubscribe_request(header, body) ⇒ Object

Return a response to the ‘unsubscribe` action.



686
687
688
689
690
691
692
693
694
695
696
697
698
# File 'lib/arborist/manager.rb', line 686

def handle_unsubscribe_request( header, body )
  self.log.info "UNSUBSCRIBE: %p" % [ header ]
  subscription_id = header[ 'subscription_id' ] or
    return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNSUBSCRIBE.' )
  subscription = self.remove_subscription( subscription_id ) or
    return Arborist::TreeAPI.successful_response( nil )

  self.log.info "Destroyed subscription: %p" % [ subscription ]
  return Arborist::TreeAPI.successful_response(
    event_type: subscription.event_type,
    criteria: subscription.criteria
  )
end

#handle_update_request(header, body) ⇒ Object

Update nodes using the data from the update request’s body.



783
784
785
786
787
788
789
790
791
792
793
794
795
796
# File 'lib/arborist/manager.rb', line 783

def handle_update_request( header, body )
  self.log.info "UPDATE: %p" % [ header ]

  unless body.respond_to?( :each )
    return Arborist::TreeAPI.error_response( 'client', 'Malformed update: body does not respond to #each' )
  end

  monitor_key = header['monitor_key']
  body.each do |identifier, properties|
    self.update_node( identifier, properties, monitor_key )
  end

  return Arborist::TreeAPI.successful_response( nil )
end

#inspectObject

Return a human-readable representation of the Manager suitable for debugging.



264
265
266
267
268
269
270
271
# File 'lib/arborist/manager.rb', line 264

def inspect
  return "#<%p:%#x {runid: %s} %d nodes>" % [
    self.class,
    self.object_id * 2,
    self.run_id,
    self.nodes.length,
  ]
end

Link the node to other nodes in the tree.



496
497
498
499
500
501
# File 'lib/arborist/manager.rb', line 496

def link_node( node )
  raise "Tree is not built yet" unless self.tree_built?

  self.link_node_to_parent( node )
  node.register_secondary_dependencies( self )
end

Link the specified node to its parent. Raises an error if the specified node‘s parent is not yet loaded.



470
471
472
473
474
475
476
477
478
# File 'lib/arborist/manager.rb', line 470

def link_node_to_parent( node )
  self.log.debug "Linking node %p to its parent" % [ node ]
  parent_id = node.parent || '_'
  parent_node = self.nodes[ parent_id ] or
    raise "no parent '%s' node loaded for %p" % [ parent_id, node ]

  self.log.debug "adding %p as a child of %p" % [ node, parent_node ]
  parent_node.add_child( node )
end

#load_tree(enumerator) ⇒ Object

Add nodes yielded from the specified enumerator into the manager’s tree.



440
441
442
443
444
445
# File 'lib/arborist/manager.rb', line 440

def load_tree( enumerator )
  enumerator.each do |node|
    self.add_node( node )
  end
  self.build_tree
end

#lookup_tree_request_action(header) ⇒ Object

Given a request header, return a #call-able object that can handle the response.



641
642
643
644
645
646
647
648
649
650
651
652
# File 'lib/arborist/manager.rb', line 641

def lookup_tree_request_action( header )
  raise Arborist::MessageError, "unsupported version %d" % [ header['version'] ] unless
    header['version'] == 1

  action = header['action'] or
    raise Arborist::MessageError, "missing required header 'action'"
  raise Arborist::MessageError, "No such action '%s'" % [ action ] unless
    VALID_TREEAPI_ACTIONS.include?( action )

  handler_name = "handle_%s_request" % [ action ]
  return self.method( handler_name )
end

#merge_dependencies_from(from, deps_set = Set.new) ⇒ Object

Recurse into the children and secondary dependencies of the from node and merge the identifiers of the traversed nodes into the deps_set.



745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
# File 'lib/arborist/manager.rb', line 745

def merge_dependencies_from( from, deps_set=Set.new )
  return deps_set unless deps_set.add?( from )

  start_node = self.nodes[ from ] or
    raise Arborist::ClientError "No such node %s." % [ from ]

  self.enumerator_for( start_node ).each do |subnode|
    deps_set.add( subnode.identifier )
    subnode.node_subscribers.each do |subdep|
      self.merge_dependencies_from( subdep, deps_set )
    end
  end

  return deps_set
end

#nodecountObject

Return the number of nodes in the manager’s tree.



569
570
571
# File 'lib/arborist/manager.rb', line 569

def nodecount
  return self.nodes.length
end

#nodelistObject

Return an Array of the identifiers of all nodes in the manager’s tree.



575
576
577
# File 'lib/arborist/manager.rb', line 575

def nodelist
  return self.nodes.keys
end

#on_event_socket_event(event) ⇒ Object

IO event handler for the event socket.



1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
# File 'lib/arborist/manager.rb', line 1034

def on_event_socket_event( event )
  if event.writable?
    if (( msg = self.event_queue.shift ))
      # self.log.debug "Publishing event %p" % [ msg ]
      event.socket << msg
    end
  else
    raise "Unhandled event %p on the event socket" % [ event ]
  end

  self.unregister_event_socket if self.event_queue.empty?
end

#on_hangup_signal(signo) ⇒ Object

Handle a HUP signal. The default is to restart the handler.



420
421
422
423
# File 'lib/arborist/manager.rb', line 420

def on_hangup_signal( signo )
  self.log.warn "Hangup (%p)" % [ signo ]
  self.restart
end

#on_termination_signal(signo) ⇒ Object Also known as: on_interrupt_signal

Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to #on_interrupt_signal.



412
413
414
415
# File 'lib/arborist/manager.rb', line 412

def on_termination_signal( signo )
  self.log.warn "Terminated (%p)" % [ signo ]
  self.stop
end

#on_tree_socket_event(event) ⇒ Object

ZMQ::Handler API – Read and handle an incoming request.



603
604
605
606
607
608
609
610
611
# File 'lib/arborist/manager.rb', line 603

def on_tree_socket_event( event )
  if event.readable?
    request = event.socket.receive
    msg = self.dispatch_request( request )
    event.socket << msg
  else
    raise "Unsupported event %p on tree API socket!" % [ event ]
  end
end

#on_user1_signal(signo) ⇒ Object

Handle a USR1 signal. Writes a message to the log.



427
428
429
430
# File 'lib/arborist/manager.rb', line 427

def on_user1_signal( signo )
  self.log.info "Checkpoint: User signal."
  self.save_node_states
end

#propagate_events(node, *events) ⇒ Object

Propagate one or more events to the specified node and its ancestors in the tree, publishing them to matching subscriptions belonging to the nodes along the way.



1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
# File 'lib/arborist/manager.rb', line 1104

def propagate_events( node, *events )
  node.publish_events( *events )

  if node.parent
    self.log.debug "Propagating %d events from %s -> %s" % [
      events.length,
      node.identifier,
      node.parent
    ]
    parent = self.nodes[ node.parent ] or raise "couldn't find parent %p of node %p!" %
      [ node.parent, node.identifier ]
    self.propagate_events( parent, *events )
  end
end

#publish(identifier, event) ⇒ Object

Publish the specified event.



1011
1012
1013
1014
# File 'lib/arborist/manager.rb', line 1011

def publish( identifier, event )
  self.event_queue << Arborist::EventAPI.encode( identifier, event.to_h )
  self.register_event_socket if self.running?
end

#publish_heartbeat_eventObject

Publish a system event that observers can watch for to detect restarts.



1049
1050
1051
1052
1053
1054
1055
1056
# File 'lib/arborist/manager.rb', line 1049

def publish_heartbeat_event
  return unless self.start_time
  self.publish_system_event( 'heartbeat',
    run_id: self.run_id,
    start_time: self.start_time.iso8601,
    version: Arborist::VERSION
  )
end

#publish_system_event(eventname, **data) ⇒ Object

Publish an event with the specified eventname and data.



1060
1061
1062
1063
1064
1065
# File 'lib/arborist/manager.rb', line 1060

def publish_system_event( eventname, **data )
  eventname = eventname.to_s
  eventname = 'sys.' + eventname unless eventname.start_with?( 'sys.' )
  self.log.debug "Publishing %s event: %p." % [ eventname, data ]
  self.publish( eventname, data )
end

#reachable_nodes(&block) ⇒ Object

Yield each node that is not down to the specified block, or return an Enumerator if no block is given.



924
925
926
927
928
# File 'lib/arborist/manager.rb', line 924

def reachable_nodes( &block )
  iter = self.enumerator_for( self.root ) {|node| node.reachable? }
  return iter.each( &block ) if block
  return iter
end

#register_checkpoint_timerObject

Register a periodic timer that will save a snapshot of the node tree’s state to the state file on a configured interval if one is configured.



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/arborist/manager.rb', line 354

def register_checkpoint_timer
  unless self.class.state_file
    self.log.info "No state file configured; skipping checkpoint timer setup."
    return nil
  end
  interval = self.class.checkpoint_frequency
  unless interval && interval.nonzero?
    self.log.info "Checkpoint frequency is %p; skipping checkpoint timer setup." % [ interval ]
    return nil
  end

  self.log.info "Setting up node state checkpoint every %0.3fs" % [ interval ]
  @checkpoint_timer = self.reactor.add_periodic_timer( interval ) do
    self.save_node_states
  end
end

#register_event_socketObject

Register the publisher with the reactor if it’s not already.



1018
1019
1020
1021
1022
# File 'lib/arborist/manager.rb', line 1018

def register_event_socket
  self.log.debug "Registering event socket for write events."
  self.reactor.enable_events( self.event_socket, :write ) unless
    self.reactor.event_enabled?( self.event_socket, :write )
end

#register_heartbeat_timerObject

Register a periodic timer that will publish a heartbeat event at a configurable interval.



330
331
332
333
334
335
336
337
# File 'lib/arborist/manager.rb', line 330

def register_heartbeat_timer
  interval = self.class.heartbeat_frequency

  self.log.info "Setting up to heartbeat every %ds" % [ interval ]
  @heartbeat_timer = self.reactor.add_periodic_timer( interval ) do
    self.publish_heartbeat_event
  end
end

#register_timersObject

Register the Manager’s timers.



223
224
225
226
# File 'lib/arborist/manager.rb', line 223

def register_timers
  self.register_checkpoint_timer
  self.register_heartbeat_timer
end

#remove_node(node) ⇒ Object

Remove a node from the Manager. The node can either be the Arborist::Node to remove, or the identifier of a node.



506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
# File 'lib/arborist/manager.rb', line 506

def remove_node( node )
  node = self.nodes[ node ] unless node.is_a?( Arborist::Node )
  return unless node

  raise "Can't remove an operational node" if node.operational?

  self.log.info "Removing node %p" % [ node ]
  self.publish_system_event( 'node_removed', node: node.identifier )
  node.children.each do |identifier, child_node|
    self.remove_node( child_node )
  end

  if parent_node = self.nodes[ node.parent || '_' ]
    parent_node.remove_child( node )
  end

  return self.nodes.delete( node.identifier )
end

#remove_subscription(subscription_identifier) ⇒ Object

Remove the subscription with the specified subscription_identifier from the node it’s attached to and from the manager, and return it.



1096
1097
1098
1099
# File 'lib/arborist/manager.rb', line 1096

def remove_subscription( subscription_identifier )
  node = self.subscriptions.delete( subscription_identifier ) or return nil
  return node.remove_subscription( subscription_identifier )
end

#restartObject

Restart the manager

Raises:

  • (NotImplementedError)


251
252
253
# File 'lib/arborist/manager.rb', line 251

def restart
  raise NotImplementedError
end

#restore_node_statesObject

Attempt to restore the state of loaded node from the configured state file. Returns true if it succeeded, or false if a state file wasn’t configured, doesn’t exist, isn’t readable, or couldn’t be unmarshalled.



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/arborist/manager.rb', line 306

def restore_node_states
  path = self.class.state_file or return false
  return false unless path.readable?

  self.log.info "Restoring node state from %s" % [ path ]
  nodes = Marshal.load( path.open('r:binary') )

  nodes.each do |identifier, saved_node|
    self.log.debug "Loaded node: %p" % [ identifier ]
    if (( current_node = self.nodes[ identifier ] ))
      self.log.debug "Restoring state of the %p node." % [ identifier ]
      current_node.restore( saved_node )
    else
      self.log.info "Not restoring state for the %s node: not present in the loaded tree." %
        [ identifier ]
    end
  end

  return true
end

#resume_checkpoint_timerObject

Resume the timer that saves tree snapshots.



379
380
381
# File 'lib/arborist/manager.rb', line 379

def resume_checkpoint_timer
  self.reactor.resume_timer( self.checkpoint_timer )
end

#resume_heartbeat_timerObject

Resume the timer that publishes heartbeat events.



347
348
349
# File 'lib/arborist/manager.rb', line 347

def resume_heartbeat_timer
  self.reactor.resume_timer( self.heartbeat_timer )
end

#root_nodeObject

Return the current root node.



908
909
910
# File 'lib/arborist/manager.rb', line 908

def root_node
  return self.nodes[ '_' ]
end

#runObject

Setup sockets and start the event loop.



186
187
188
189
190
191
192
193
194
195
196
# File 'lib/arborist/manager.rb', line 186

def run
  self.log.info "Getting ready to start the manager."
  self.setup_sockets
  self.register_timers
  self.with_signal_handler( reactor, *QUEUE_SIGS ) do
    self.start_accepting_requests
  end
ensure
  self.shutdown_sockets
  self.save_node_states
end

#running?Boolean

Returns true if the Manager is running.

Returns:

  • (Boolean)


215
216
217
218
219
# File 'lib/arborist/manager.rb', line 215

def running?
  return self.reactor &&
    self.event_socket &&
    self.reactor.registered?( self.event_socket )
end

#save_node_statesObject

Write out the state of all the manager’s nodes to the state_file if one is configured.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/arborist/manager.rb', line 280

def save_node_states
  start_time = Time.now
  path = self.class.state_file or return
  self.log.info "Saving current node state to %s" % [ path ]
  tmpfile = Tempfile.create(
    [path.basename.to_s.sub(path.extname, ''), path.extname],
    path.dirname.to_s,
    encoding: 'binary'
  )
  Marshal.dump( self.nodes, tmpfile )
  tmpfile.close

  File.rename( tmpfile.path, path.to_s )
  self.log.debug "Saved state file in %0.1f seconds." % [ Time.now - start_time ]

rescue SystemCallError => err
  self.log.error "%p while saving node state: %s" % [ err.class, err.message ]

ensure
  File.unlink( tmpfile.path ) if tmpfile && File.exist?( tmpfile.path )
end

#setup_event_socketObject

Set up the ZMQ PUB socket for published events.



983
984
985
986
987
988
989
# File 'lib/arborist/manager.rb', line 983

def setup_event_socket
  @event_socket = CZTop::Socket::PUB.new
  self.log.info "  binding the event socket (%#0x) to %p" %
    [ @event_socket.object_id * 2, Arborist.event_api_url ]
  @event_socket.options.linger = ( self.linger * 1000 ).ceil
  @event_socket.bind( Arborist.event_api_url )
end

#setup_socketsObject

Create the sockets used by the manager and bind them to the appropriate endpoints.



201
202
203
204
# File 'lib/arborist/manager.rb', line 201

def setup_sockets
  self.setup_tree_socket
  self.setup_event_socket
end

#setup_tree_socketObject

Set up the ZeroMQ REP socket for the Tree API.



586
587
588
589
590
591
592
# File 'lib/arborist/manager.rb', line 586

def setup_tree_socket
  @tree_socket = CZTop::Socket::REP.new
  self.log.info "  binding the tree API socket (%#0x) to %p" %
    [ @tree_socket.object_id * 2, Arborist.tree_api_url ]
  @tree_socket.options.linger = 0
  @tree_socket.bind( Arborist.tree_api_url )
end

#shutdown_event_socketObject

Stop accepting events to be published



993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
# File 'lib/arborist/manager.rb', line 993

def shutdown_event_socket
  start   = Time.now
  timeout = start + (self.linger.to_f / 2.0)

  self.log.warn "Waiting to empty the event queue..."
  until self.event_queue.empty?
    sleep 0.1
    break if Time.now > timeout
  end
  self.log.warn "  ... waited %0.1f seconds" % [ Time.now - start ]

  @event_socket.options.linger = 0
  @event_socket.unbind( @event_socket.last_endpoint )
  @event_socket = nil
end

#shutdown_socketsObject

Shut down the sockets used by the manager.



208
209
210
211
# File 'lib/arborist/manager.rb', line 208

def shutdown_sockets
  self.shutdown_tree_socket
  self.shutdown_event_socket
end

#shutdown_tree_socketObject

Tear down the ZeroMQ REP socket.



596
597
598
599
# File 'lib/arborist/manager.rb', line 596

def shutdown_tree_socket
  @tree_socket.unbind( @tree_socket.last_endpoint )
  @tree_socket = nil
end

#start_accepting_requestsObject

Start a loop, accepting a request and handling it.



237
238
239
240
241
242
243
244
245
246
247
# File 'lib/arborist/manager.rb', line 237

def start_accepting_requests
  self.log.debug "Starting the main loop"

  self.start_time = Time.now

  self.reactor.register( self.tree_socket, :read, &self.method(:on_tree_socket_event) )
  self.reactor.register( self.event_socket, :write, &self.method(:on_event_socket_event) )

  self.log.debug "Manager running."
  return self.reactor.start_polling( ignore_interrupts: true )
end

#stopObject

Stop the manager.



257
258
259
260
# File 'lib/arborist/manager.rb', line 257

def stop
  self.log.info "Stopping the manager."
  self.reactor.stop_polling
end

#subscribe(identifier, subscription) ⇒ Object

Add the specified subscription to the node corresponding with the given identifier.



1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
# File 'lib/arborist/manager.rb', line 1069

def subscribe( identifier, subscription )
  identifier ||= '_'
  node = self.nodes[ identifier ] or raise ArgumentError, "no such node %p" % [ identifier ]

  self.log.debug "Registering subscription %p" % [ subscription ]
  node.add_subscription( subscription )
  self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ]
  self.subscriptions[ subscription.id ] = node
  self.log.debug "  subscriptions hash: %#0x" % [ self.subscriptions.object_id ]
end

#tree_builtObject

Flag for marking when the tree is built successfully the first time



164
# File 'lib/arborist/manager.rb', line 164

attr_predicate_accessor :tree_built

#unregister_event_socketObject

Unregister the event publisher socket from the reactor if it’s registered.



1026
1027
1028
1029
1030
# File 'lib/arborist/manager.rb', line 1026

def unregister_event_socket
  self.log.debug "Unregistering event socket for write events."
  self.reactor.disable_events( self.event_socket, :write ) if
    self.reactor.event_enabled?( self.event_socket, :write )
end

#update_node(identifier, new_properties, monitor_key = '_') ⇒ Object

Update the node with the specified identifier with the given new_properties and propagate any events generated by the update to the node and its ancestors.



528
529
530
531
532
533
534
535
536
# File 'lib/arborist/manager.rb', line 528

def update_node( identifier, new_properties, monitor_key='_' )
  unless (( node = self.nodes[identifier] ))
    self.log.warn "Update for non-existent node %p ignored." % [ identifier ]
    return []
  end

  events = node.update( new_properties, monitor_key )
  self.propagate_events( node, events )
end

#uptimeObject

Return the duration the manager has been running in seconds.



562
563
564
565
# File 'lib/arborist/manager.rb', line 562

def uptime
  return 0 unless self.start_time
  return Time.now - self.start_time
end