Class: MCollective::Aggregate

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/aggregate.rb,
lib/mcollective/aggregate/sum.rb,
lib/mcollective/aggregate/base.rb,
lib/mcollective/aggregate/result.rb,
lib/mcollective/aggregate/average.rb,
lib/mcollective/aggregate/summary.rb,
lib/mcollective/aggregate/result/base.rb,
lib/mcollective/aggregate/result/numeric_result.rb,
lib/mcollective/aggregate/result/collection_result.rb

Defined Under Namespace

Modules: Result Classes: Average, Base, Sum, Summary

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ddl) ⇒ Aggregate

Returns a new instance of Aggregate.



8
9
10
11
12
13
14
15
# File 'lib/mcollective/aggregate.rb', line 8

def initialize(ddl)
  @functions = []
  @ddl = ddl
  @action = ddl[:action]
  @failed = []

  create_functions
end

Instance Attribute Details

#actionObject

Returns the value of attribute action.



6
7
8
# File 'lib/mcollective/aggregate.rb', line 6

def action
  @action
end

#ddlObject

Returns the value of attribute ddl.



6
7
8
# File 'lib/mcollective/aggregate.rb', line 6

def ddl
  @ddl
end

#failedObject

Returns the value of attribute failed.



6
7
8
# File 'lib/mcollective/aggregate.rb', line 6

def failed
  @failed
end

#functionsObject

Returns the value of attribute functions.



6
7
8
# File 'lib/mcollective/aggregate.rb', line 6

def functions
  @functions
end

Instance Method Details

#call_functions(reply) ⇒ Object

Call all the appropriate functions with the reply data received from RPC::Client



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/mcollective/aggregate.rb', line 45

def call_functions(reply)
  @functions.each do |function|
    Log.debug("Calling aggregate function #{function} for result")
    begin
      function.process_result(reply[:data][function.output_name], reply)
    rescue Exception => e
      Log.error("Could not process aggregate function for '#{function.output_name}'. #{e.to_s}")
      @failed << {:name => function.output_name, :type => :process_result}
      @functions.delete(function)
    end
  end
end

#contains_output?(output) ⇒ Boolean

Check if the function param is defined as an output for the action in the ddl

Returns:

  • (Boolean)


40
41
42
# File 'lib/mcollective/aggregate.rb', line 40

def contains_output?(output)
  @ddl[:output].keys.include?(output)
end

#create_functionsObject

Creates instances of the Aggregate functions and stores them in the function array. All aggregate call and summarize method calls operate on these function as a batch.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/mcollective/aggregate.rb', line 19

def create_functions
  @ddl[:aggregate].each_with_index do |agg, i|
    output = agg[:args][0]

    if contains_output?(output)
      arguments = agg[:args][1]
      format = (arguments.delete(:format) if arguments) || nil
      begin
        @functions << load_function(agg[:function]).new(output, arguments, format, @action)
      rescue Exception => e
        Log.error("Cannot create aggregate function '#{output}'. #{e.to_s}")
        @failed << {:name => output, :type => :startup}
      end
    else
      Log.error("Cannot create aggregate function '#{output}'. '#{output}' has not been specified as a valid ddl output.")
      @failed << {:name => output, :type => :create}
    end
  end
end

#load_function(function_name) ⇒ Object

Loads function from disk for use



76
77
78
79
80
81
82
83
# File 'lib/mcollective/aggregate.rb', line 76

def load_function(function_name)
  function_name = function_name.to_s.capitalize

  PluginManager.loadclass("MCollective::Aggregate::#{function_name}") unless Aggregate.const_defined?(function_name)
  Aggregate.const_get(function_name)
rescue Exception
  raise "Aggregate function file '#{function_name.downcase}.rb' cannot be loaded"
end

#summarizeObject

Finalizes the function returning a result object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/mcollective/aggregate.rb', line 59

def summarize
  summary = @functions.map do |function|
    begin
      function.summarize
    rescue Exception => e
      Log.error("Could not summarize aggregate result for '#{function.output_name}'. #{e.to_s}")
      @failed << {:name => function.output_name, :type => :summarize}
      nil
    end
  end

  summary.reject{|x| x.nil?}.sort do |x,y|
    x.result[:output] <=> y.result[:output]
  end
end