Class: ActiveWarehouse::Aggregate
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- ActiveWarehouse::Aggregate
- Defined in:
- lib/active_warehouse/aggregate.rb
Overview
An aggreate within a cube used to store calculated values. Each aggregate will contain values for a dimension pair, down each of the dimension hierarchies
Class Attribute Summary collapse
-
.cube ⇒ Object
Returns the value of attribute cube.
-
.dimension1 ⇒ Object
Returns the value of attribute dimension1.
-
.dimension1_hierarchy_name ⇒ Object
Returns the value of attribute dimension1_hierarchy_name.
-
.dimension2 ⇒ Object
Returns the value of attribute dimension2.
-
.dimension2_hierarchy_name ⇒ Object
Returns the value of attribute dimension2_hierarchy_name.
-
.name ⇒ Object
Returns the value of attribute name.
Class Method Summary collapse
-
.aggregate_id ⇒ Object
Returns the aggregate ID.
-
.build_query(fact_class, dim1, dim1_stage_path, dim2, dim2_stage_path) ⇒ Object
Build the aggregation query for the given dimensions and stage paths.
-
.create_storage_table(force = false) ⇒ Object
Create the aggregate table if required.
-
.key(dimension1, dimension1_hierarchy, dimension2, dimension2_hierarchy) ⇒ Object
Return a key for the aggregate.
-
.meta_data ⇒ Object
Returns the AggregateMetaData instance associated with this aggregate.
-
.needs_rebuild?(last_build = nil) ⇒ Boolean
Return true if the aggregate needs to be rebuilt.
-
.populate ⇒ Object
Populate the aggregate table.
-
.table_name ⇒ Object
Get the table name for the aggregate.
Instance Method Summary collapse
-
#clone_and_reset ⇒ Object
Clone and reset at the same time.
- #data_fields ⇒ Object
- #non_data_fields ⇒ Object
-
#reset ⇒ Object
Reset the aggregate.
Class Attribute Details
.cube ⇒ Object
Returns the value of attribute cube.
6 7 8 |
# File 'lib/active_warehouse/aggregate.rb', line 6 def cube @cube end |
.dimension1 ⇒ Object
Returns the value of attribute dimension1.
6 7 8 |
# File 'lib/active_warehouse/aggregate.rb', line 6 def dimension1 @dimension1 end |
.dimension1_hierarchy_name ⇒ Object
Returns the value of attribute dimension1_hierarchy_name.
6 7 8 |
# File 'lib/active_warehouse/aggregate.rb', line 6 def dimension1_hierarchy_name @dimension1_hierarchy_name end |
.dimension2 ⇒ Object
Returns the value of attribute dimension2.
6 7 8 |
# File 'lib/active_warehouse/aggregate.rb', line 6 def dimension2 @dimension2 end |
.dimension2_hierarchy_name ⇒ Object
Returns the value of attribute dimension2_hierarchy_name.
6 7 8 |
# File 'lib/active_warehouse/aggregate.rb', line 6 def dimension2_hierarchy_name @dimension2_hierarchy_name end |
.name ⇒ Object
Returns the value of attribute name.
6 7 8 |
# File 'lib/active_warehouse/aggregate.rb', line 6 def name @name end |
Class Method Details
.aggregate_id ⇒ Object
Returns the aggregate ID
16 17 18 19 |
# File 'lib/active_warehouse/aggregate.rb', line 16 def aggregate_id table_name =~ /(\d+)$/ $1.to_i end |
.build_query(fact_class, dim1, dim1_stage_path, dim2, dim2_stage_path) ⇒ Object
Build the aggregation query for the given dimensions and stage paths
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/active_warehouse/aggregate.rb', line 129 def build_query(fact_class, dim1, dim1_stage_path, dim2, dim2_stage_path) dim1_group = dim1_stage_path.collect { |p| "d1.#{p}"}.join(", ") dim2_group = dim2_stage_path.collect { |p| "d2.#{p}"}.join(", ") # Set up the find options = {} [:group] = "#{dim1_group}, #{dim2_group}" [:joins] = "join #{dim1.table_name} d1 on f.#{dim1.foreign_key} = d1.id" [:joins] << " join #{dim2.table_name} d2 on f.#{dim2.foreign_key} = d2.id" # Build the 'select' part of the query # denominator = nil fields = [] fact_select = ["#{dim1_group}, #{dim2_group}"] fact_class.aggregate_fields.each do |field_name| = fact_class.[field_name] fields << field_name [:type] ||= :sum case [:type] when :sum fact_select << " sum(f.#{field_name}) as #{field_name}" when Hash if [:type][dim1.sym] == :average && [:type][dim2.sym] == :average # I believe this is a special case, but I'm not sure how yet. If both dimensions are defined # averages then perhaps that value cannot be calculated at all. TODO: research else fact_select << " sum(f.#{field_name}) as #{field_name}" end else raise "Unsupported aggregate type: #{options[:type]}" end end [:select] = fact_select.join(',') # put the SQL statement together stmt = "select #{fact_find_options[:select]} from " stmt << "#{fact_class.table_name} f #{fact_find_options[:joins]} " stmt << "group by #{fact_find_options[:group]} " return stmt.strip, fields end |
.create_storage_table(force = false) ⇒ Object
Create the aggregate table if required. Set force option to true to force creation of the table if it already exists
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/active_warehouse/aggregate.rb', line 40 def create_storage_table(force=false) connection.drop_table(table_name) if force and table_exists? if !table_exists? connection.create_table(table_name, :id => false) do |t| t.column :dimension1_path, :string t.column :dimension1_stage, :integer t.column :dimension2_path, :string t.column :dimension2_stage, :integer cube.fact_class.aggregate_fields.each do |field| #options = cube.fact_class.aggregate_field_options[field] col = cube.fact_class.columns_hash[field.to_s] t.column field, col.type if col end end connection.add_index(table_name, :dimension1_path) connection.add_index(table_name, :dimension1_stage) connection.add_index(table_name, :dimension2_path) connection.add_index(table_name, :dimension2_stage) end end |
.key(dimension1, dimension1_hierarchy, dimension2, dimension2_hierarchy) ⇒ Object
Return a key for the aggregate
34 35 36 |
# File 'lib/active_warehouse/aggregate.rb', line 34 def key(dimension1, dimension1_hierarchy, dimension2, dimension2_hierarchy) AggregateKey.new(dimension1, dimension1_hierarchy, dimension2, dimension2_hierarchy) end |
.meta_data ⇒ Object
Returns the AggregateMetaData instance associated with this aggregate
22 23 24 |
# File 'lib/active_warehouse/aggregate.rb', line 22 def AggregateMetaData.find(aggregate_id) end |
.needs_rebuild?(last_build = nil) ⇒ Boolean
Return true if the aggregate needs to be rebuilt
27 28 29 30 31 |
# File 'lib/active_warehouse/aggregate.rb', line 27 def needs_rebuild?(last_build=nil) return true if .populated_at.nil? return true if last_build && (.populated_at < last_build) return false end |
.populate ⇒ Object
Populate the aggregate table
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/active_warehouse/aggregate.rb', line 62 def populate # create the storage table if necessary create_storage_table #puts "Populating aggregate table #{table_name}" # clear out the current data #connection.execute("TRUNCATE TABLE #{table_name}") #TODO: make this generic to support all databases delete_all $first = false # aggregate the data for the two dimensions fact_class = cube.fact_class dim1 = Dimension.class_name(dimension1).constantize dim2 = Dimension.class_name(dimension2).constantize dim1_stage_path = [] dim1.hierarchy(.dimension1_hierarchy.to_sym).each_with_index do |dim1_stage_name, dim1_stage_level| dim1_stage_path << dim1_stage_name dim2_stage_path = [] dim2.hierarchy(.dimension2_hierarchy.to_sym).each_with_index do |dim2_stage_name, dim2_stage_level| dim2_stage_path << dim2_stage_name stmt, fields = build_query(fact_class, dim1, dim1_stage_path, dim2, dim2_stage_path) puts "\nSTMT: #{stmt}" if $first # Get the facts and aggregate them # TODO: replace with select_all fact_class.connection.select_all(stmt).each do |row| require 'pp' pp row if $first dim1_value = [] dim1_stage_path.each do |v| dim1_value << row["#{v}"] end dim2_value = [] dim2_stage_path.each do |v| dim2_value << row["#{v}"] end agg_instance = new agg_instance.dimension1_path = dim1_value.join(':') agg_instance.dimension1_stage = dim1_stage_level agg_instance.dimension2_path = dim2_value.join(':') agg_instance.dimension2_stage = dim2_stage_level puts "DIM1_PATH: #{agg_instance.dimension1_path}" if $first puts "DIM2_PATH: #{agg_instance.dimension2_path}" if $first pp fields if $first fields.each do |field| # do the average here puts "setting field #{field}, value is #{row[field.to_s]}" if $first agg_instance.send("#{field}=".to_sym, row[field.to_s]) end agg_instance.save! .update_attribute(:populated_at, Time.now) $first = false end end end end |
.table_name ⇒ Object
Get the table name for the aggregate
9 10 11 12 13 |
# File 'lib/active_warehouse/aggregate.rb', line 9 def table_name name = self.name.demodulize.underscore set_table_name(name) name end |
Instance Method Details
#clone_and_reset ⇒ Object
Clone and reset at the same time
176 177 178 179 180 |
# File 'lib/active_warehouse/aggregate.rb', line 176 def clone_and_reset o = clone o.reset o end |
#data_fields ⇒ Object
186 187 188 189 190 191 192 193 194 |
# File 'lib/active_warehouse/aggregate.rb', line 186 def data_fields fields = [] self.class.columns.each do |column| unless non_data_fields.include?(column.name) fields << column.name end end fields end |
#non_data_fields ⇒ Object
182 183 184 |
# File 'lib/active_warehouse/aggregate.rb', line 182 def non_data_fields ['dimension1_path','dimension1_stage','dimension2_path','dimension2_stage'] end |
#reset ⇒ Object
Reset the aggregate
197 198 199 200 201 202 203 204 |
# File 'lib/active_warehouse/aggregate.rb', line 197 def reset self.class.columns.each do |column| unless non_data_fields.include?(column.name) value = column.number? ? 0 : 'None' send("#{column.name}=".to_sym, value) end end end |