Class: ActiveWarehouse::Aggregate

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/active_warehouse/aggregate.rb

Overview

An aggreate within a cube used to store calculated values. Each aggregate will contain values for a dimension pair, down each of the dimension hierarchies

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.cubeObject

Returns the value of attribute cube.



6
7
8
# File 'lib/active_warehouse/aggregate.rb', line 6

def cube
  @cube
end

.dimension1Object

Returns the value of attribute dimension1.



6
7
8
# File 'lib/active_warehouse/aggregate.rb', line 6

def dimension1
  @dimension1
end

.dimension1_hierarchy_nameObject

Returns the value of attribute dimension1_hierarchy_name.



6
7
8
# File 'lib/active_warehouse/aggregate.rb', line 6

def dimension1_hierarchy_name
  @dimension1_hierarchy_name
end

.dimension2Object

Returns the value of attribute dimension2.



6
7
8
# File 'lib/active_warehouse/aggregate.rb', line 6

def dimension2
  @dimension2
end

.dimension2_hierarchy_nameObject

Returns the value of attribute dimension2_hierarchy_name.



6
7
8
# File 'lib/active_warehouse/aggregate.rb', line 6

def dimension2_hierarchy_name
  @dimension2_hierarchy_name
end

.nameObject

Returns the value of attribute name.



6
7
8
# File 'lib/active_warehouse/aggregate.rb', line 6

def name
  @name
end

Class Method Details

.aggregate_idObject

Returns the aggregate ID



16
17
18
19
# File 'lib/active_warehouse/aggregate.rb', line 16

def aggregate_id
  table_name =~ /(\d+)$/
  $1.to_i
end

.build_query(fact_class, dim1, dim1_stage_path, dim2, dim2_stage_path) ⇒ Object

Build the aggregation query for the given dimensions and stage paths



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/active_warehouse/aggregate.rb', line 129

def build_query(fact_class, dim1, dim1_stage_path, dim2, dim2_stage_path)
  dim1_group = dim1_stage_path.collect { |p| "d1.#{p}"}.join(", ")
  dim2_group = dim2_stage_path.collect { |p| "d2.#{p}"}.join(", ")

  # Set up the find options
  fact_find_options = {}
  fact_find_options[:group] = "#{dim1_group}, #{dim2_group}"
  fact_find_options[:joins] = "join #{dim1.table_name} d1 on f.#{dim1.foreign_key} = d1.id"
  fact_find_options[:joins] << " join #{dim2.table_name} d2 on f.#{dim2.foreign_key} = d2.id"

  # Build the 'select' part of the query
  # denominator = nil
  fields = []
  fact_select = ["#{dim1_group}, #{dim2_group}"]
  fact_class.aggregate_fields.each do |field_name|
    options = fact_class.aggregate_field_options[field_name]
    fields << field_name
    
    options[:type] ||= :sum
    case options[:type]
    when :sum
      fact_select << " sum(f.#{field_name}) as #{field_name}"
    when Hash
      if options[:type][dim1.sym] == :average && options[:type][dim2.sym] == :average
        # I believe this is a special case, but I'm not sure how yet. If both dimensions are defined
        # averages then perhaps that value cannot be calculated at all. TODO: research
      else
        fact_select << " sum(f.#{field_name}) as #{field_name}"
      end
    else
      raise "Unsupported aggregate type: #{options[:type]}"
    end
  end
  fact_find_options[:select] = fact_select.join(',')

  # put the SQL statement together
  stmt = "select #{fact_find_options[:select]} from "
  stmt << "#{fact_class.table_name} f #{fact_find_options[:joins]} "
  stmt << "group by #{fact_find_options[:group]} "

  return stmt.strip, fields
end

.create_storage_table(force = false) ⇒ Object

Create the aggregate table if required. Set force option to true to force creation of the table if it already exists



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/active_warehouse/aggregate.rb', line 40

def create_storage_table(force=false)
  connection.drop_table(table_name) if force and table_exists?
  if !table_exists?
    connection.create_table(table_name, :id => false) do |t|
      t.column :dimension1_path, :string
      t.column :dimension1_stage, :integer
      t.column :dimension2_path, :string
      t.column :dimension2_stage, :integer
      cube.fact_class.aggregate_fields.each do |field|
        #options = cube.fact_class.aggregate_field_options[field]
        col = cube.fact_class.columns_hash[field.to_s]
        t.column field, col.type if col
      end
    end
    connection.add_index(table_name, :dimension1_path)
    connection.add_index(table_name, :dimension1_stage)
    connection.add_index(table_name, :dimension2_path)
    connection.add_index(table_name, :dimension2_stage)
  end
end

.key(dimension1, dimension1_hierarchy, dimension2, dimension2_hierarchy) ⇒ Object

Return a key for the aggregate



34
35
36
# File 'lib/active_warehouse/aggregate.rb', line 34

def key(dimension1, dimension1_hierarchy, dimension2, dimension2_hierarchy)
  AggregateKey.new(dimension1, dimension1_hierarchy, dimension2, dimension2_hierarchy)
end

.meta_dataObject

Returns the AggregateMetaData instance associated with this aggregate



22
23
24
# File 'lib/active_warehouse/aggregate.rb', line 22

def 
  .find(aggregate_id)
end

.needs_rebuild?(last_build = nil) ⇒ Boolean

Return true if the aggregate needs to be rebuilt

Returns:

  • (Boolean)


27
28
29
30
31
# File 'lib/active_warehouse/aggregate.rb', line 27

def needs_rebuild?(last_build=nil)
  return true if .populated_at.nil?
  return true if last_build && (.populated_at < last_build)
  return false
end

.populateObject

Populate the aggregate table



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/active_warehouse/aggregate.rb', line 62

def populate
  # create the storage table if necessary
  create_storage_table
  
  #puts "Populating aggregate table #{table_name}"
  # clear out the current data
  #connection.execute("TRUNCATE TABLE #{table_name}") #TODO: make this generic to support all databases
  delete_all
  
  $first = false
  
  # aggregate the data for the two dimensions
  fact_class = cube.fact_class
  dim1 = Dimension.class_name(dimension1).constantize
  dim2 = Dimension.class_name(dimension2).constantize
  dim1_stage_path = []
  dim1.hierarchy(.dimension1_hierarchy.to_sym).each_with_index do |dim1_stage_name, dim1_stage_level|
    dim1_stage_path << dim1_stage_name
    dim2_stage_path = []
    dim2.hierarchy(.dimension2_hierarchy.to_sym).each_with_index do |dim2_stage_name, dim2_stage_level|
      dim2_stage_path << dim2_stage_name
      
      stmt, fields = build_query(fact_class, dim1, dim1_stage_path, dim2, dim2_stage_path)
      
      puts "\nSTMT: #{stmt}" if $first
      
      # Get the facts and aggregate them
      # TODO: replace with select_all
      fact_class.connection.select_all(stmt).each do |row|
        require 'pp'
        pp row if $first
        dim1_value = []
        dim1_stage_path.each do |v|
          dim1_value << row["#{v}"]
        end
        dim2_value = []
        dim2_stage_path.each do |v|
          dim2_value << row["#{v}"]
        end
        
        agg_instance = new
        agg_instance.dimension1_path = dim1_value.join(':')
        agg_instance.dimension1_stage = dim1_stage_level
        agg_instance.dimension2_path = dim2_value.join(':')
        agg_instance.dimension2_stage = dim2_stage_level
        
        puts "DIM1_PATH: #{agg_instance.dimension1_path}" if $first
        puts "DIM2_PATH: #{agg_instance.dimension2_path}" if $first
        
        
        pp fields if $first
        fields.each do |field|
          # do the average here
          puts "setting field #{field}, value is #{row[field.to_s]}" if $first
          agg_instance.send("#{field}=".to_sym, row[field.to_s])
        end
        agg_instance.save!
        
        .update_attribute(:populated_at, Time.now)
        
        $first = false
      end
    end
  end
end

.table_nameObject

Get the table name for the aggregate



9
10
11
12
13
# File 'lib/active_warehouse/aggregate.rb', line 9

def table_name
  name = self.name.demodulize.underscore
  set_table_name(name)
  name
end

Instance Method Details

#clone_and_resetObject

Clone and reset at the same time



176
177
178
179
180
# File 'lib/active_warehouse/aggregate.rb', line 176

def clone_and_reset
  o = clone
  o.reset
  o
end

#data_fieldsObject



186
187
188
189
190
191
192
193
194
# File 'lib/active_warehouse/aggregate.rb', line 186

def data_fields
  fields = []
  self.class.columns.each do |column|
    unless non_data_fields.include?(column.name)
      fields << column.name
    end
  end
  fields
end

#non_data_fieldsObject



182
183
184
# File 'lib/active_warehouse/aggregate.rb', line 182

def non_data_fields
  ['dimension1_path','dimension1_stage','dimension2_path','dimension2_stage']
end

#resetObject

Reset the aggregate



197
198
199
200
201
202
203
204
# File 'lib/active_warehouse/aggregate.rb', line 197

def reset
  self.class.columns.each do |column|
    unless non_data_fields.include?(column.name)
      value = column.number? ? 0 : 'None'
      send("#{column.name}=".to_sym, value)
    end
  end
end