Class: Statlysis::Timely

Inherits:
Cron
  • Object
show all
Defined in:
lib/statlysis/cron/timely.rb,
lib/statlysis/cron/timely/one_dimension.rb,
lib/statlysis/cron/timely/multiple_dimensions.rb

Constant Summary collapse

SqlColumns =
[:sum_columns, :group_by_columns, :group_concat_columns]

Instance Attribute Summary

Attributes inherited from Cron

#clock, #multiple_dataset, #source_type, #time_column, #time_unit, #time_zone

Instance Method Summary collapse

Methods inherited from Cron

#_source, #group_by_columns?, #is_activerecord?, #is_mongoid?, #is_orm?, #is_time_column_integer?, #reoutput, #source_where_array, #time_column?, #time_range

Methods included from Common

#cron

Constructor Details

#initialize(source, opts = {}) ⇒ Timely

Returns a new instance of Timely.



8
9
10
11
12
13
14
# File 'lib/statlysis/cron/timely.rb', line 8

def initialize source, opts = {}
  super
  Statlysis.check_set_database
  SqlColumns.each {|sym| instance_variable_set "@#{sym}", (opts[sym] || []) }
  cron.setup_stat_model
  cron
end

Instance Method Details

#multiple_dimensions_outputObject



7
8
9
# File 'lib/statlysis/cron/timely/multiple_dimensions.rb', line 7

def multiple_dimensions_output
  self.send "multiple_dimensions_output_with#{cron.time_column ? '' : 'out'}_time_column"
end

#one_dimension_outputObject

one dimension must have ‘time_column`, or there’s nothing to do

TODO add to FAQ

  • if you want to statistics one column through ‘group_by_columns`

params, and dont need time column, then you could use ‘always` DSL.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/statlysis/cron/timely/one_dimension.rb', line 13

def one_dimension_output
  cron.time_range.map do |time|
    _hash = {:t => time, :timely_c => 0, :totally_c => 0}
    sum_column_to_result_columns_hash.each do |_sum_col, _result_cols|
      _result_cols.each do |_result_col|
        _hash[_result_col] = 0.0
      end
    end

    # support multiple data sources
    _first_source = nil
    cron.multiple_dataset.sources.each do |s|
      _t = DateTime1970
      _t = is_time_column_integer? ? _t.to_i : _t

      _scope_one = s.where(unit_range_query(time))
      # TODO cache pre-result
      _scope_all = s.where(unit_range_query(time, _t))

      # 1. count
      _hash[:timely_c]  += _scope_one.count
      _hash[:totally_c] += _scope_all.count

      # 2. sum
      sum_column_to_result_columns_hash.each do |_sum_col, _result_cols|
        _hash[_result_cols[0]] = _scope_one.map(&_sum_col).reduce(:+).to_f
        _hash[_result_cols[1]] = _scope_all.map(&_sum_col).reduce(:+).to_f
      end

      # 3. group_concat
      _other_json = {}
      _other_json[:group_concat_columns] ||= {}
      cron.group_concat_columns.each do |_group_concat_column|
        _other_json[:group_concat_columns][_group_concat_column] = _scope_one.map(&_group_concat_column).uniq
      end
      _hash[:other_json] = _other_json.to_json

      _first_source ||= s.where(unit_range_query(time))
    end
    logger.info "#{time.in_time_zone(cron.time_zone)} multiple_dataset:#{cron.multiple_dataset.name} _first_source:#{_first_source.inspect} timely_c:#{_hash[:timely_c]} totally_c:#{_hash[:totally_c]}" if ENV['DEBUG']

    _hash
  end.select {|r1| r1.except(:t, :other_json).values.reject {|r2| r2.zero? }.any? }
end

#outputObject



129
130
131
# File 'lib/statlysis/cron/timely.rb', line 129

def output
  @output ||= (cron.group_by_columns.any? ? multiple_dimensions_output : one_dimension_output)
end

#runObject

设置数据源,并保存结果入数据库



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/statlysis/cron/timely.rb', line 17

def run
  (logger.info("#{cron.multiple_dataset.name} have no result!"); return false) if cron.output.blank?

  raise "cron.output has no Enumerable" if not cron.output.class.included_modules.include? Enumerable

  num_i = 0; num_add = 999
  Statlysis.sequel.transaction do
    # delete first in range
    cron.stat_model.where("t >= ? AND t <= ?", cron.output[0][:t], cron.output[-1][:t]).delete if cron.time_column?

    # TODO partial delete
    cron.stat_model.where("").delete if cron.group_by_columns?

    while !(_a = cron.output[num_i..(num_i+num_add)]).blank? do
      # batch insert all
      cron.stat_model.multi_insert _a
      num_i += (num_add + 1)
    end
  end

  # record last executed time
  clock.update

  return self
end

#setup_stat_modelObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/statlysis/cron/timely.rb', line 44

def setup_stat_model
  cron.stat_table_name = Utils.normalise_name cron.class.name.split("::")[-1], cron.multiple_dataset.name, cron.source_where_array, cron.group_by_columns.map {|i| i[:column_name] }, TimeUnitToTableSuffixHash[cron.time_unit]
  raise "mysql only support table_name in 64 characters, the size of '#{cron.stat_table_name}' is #{cron.stat_table_name.to_s.size}. please set cron.stat_table_name when you create a Cron instance" if cron.stat_table_name.to_s.size > 64


  # create basic unchangeable table structure
  if not Statlysis.sequel.table_exists?(cron.stat_table_name)
    Statlysis.sequel.transaction do
      Statlysis.sequel.create_table cron.stat_table_name, DefaultTableOpts do
        primary_key :id # Add one column at least in this block to avoid `SQLite3::SQLException: near ")": syntax error (Sequel::DatabaseError)`
      end
      Statlysis.sequel.add_column   cron.stat_table_name, :t, DateTime if cron.time_column? # alias for :time

      # add count columns
      if cron.time_column?
        count_columns = [:timely_c, :totally_c] # alias for :count
        count_columns.each {|w| Statlysis.sequel.add_column cron.stat_table_name, w, Integer }
      else
        Statlysis.sequel.add_column cron.stat_table_name, :c, Integer # alias for :count
      end

    end
  end
  # add group_by columns & indexes
  remodel
  cron.stat_model.cron = cron
  if cron.group_by_columns.any?
    cron.group_by_columns.each do |_h|
      if not cron.stat_model.columns.include?(_h[:column_name])
        _h[:type] = SymbolToClassInDataType[_h[:type]] if _h[:type].is_a?(Symbol) # && (Statlysis.sequel.opts[:adapter] == :sqlite)
        Statlysis.sequel.add_column cron.stat_table_name, _h[:column_name], _h[:type]
      end
    end
  end

  # add sum columns
  remodel
  sum_column_to_result_columns_hash.each do |_sum_col, _result_cols|
    _result_cols.each do |_result_col|
      if not cron.stat_model.columns.include?(_result_col)
        # convert to Interger type in view if needed
        Statlysis.sequel.add_column cron.stat_table_name, _result_col, Float
      end
    end
  end

  # Fix there should be uniq index name between tables
  # `SQLite3::SQLException: index t_timely_c_totally_c already exists (Sequel::DatabaseError)`
  _group_by_columns_index_name = cron.group_by_columns.reject {|i| i[:no_index] }.map {|i| i[:column_name] }
  _truncated_columns = _group_by_columns_index_name.dup # only String column
  _group_by_columns_index_name = _group_by_columns_index_name.unshift :t if cron.time_column?
  # TODO use https://github.com/german/redis_orm to support full string indexes
  if !Statlysis.configuration.is_skip_database_index && _group_by_columns_index_name.any?
    mysql_per_column_length_limit_in_one_index = (1000 / 3.0 / _group_by_columns_index_name.size.to_f).to_i
    index_columns_str = _group_by_columns_index_name.map {|s| _truncated_columns.include?(s) ? "#{s.to_s}(#{mysql_per_column_length_limit_in_one_index})" : s.to_s }.join(", ")
    index_columns_str = "(#{index_columns_str})"
    begin
      # NOTE mysql indexes key length limit is 1000 bytes
      cron.stat_model.dataset.with_sql("CREATE INDEX #{Utils.sha1_name(_group_by_columns_index_name)} ON #{cron.stat_table_name} #{index_columns_str};").to_a
    rescue => e
      raise e if not e.inspect.match(/exists|duplicate/i)
    end
  end

  # add group_concat column
  remodel
  if cron.group_concat_columns.any? && !cron.stat_model.columns.include?(:other_json)
    Statlysis.sequel.add_column cron.stat_table_name, :other_json, :text
  end

  # add access to group_concat values in other_json
  remodel.class_eval do
    define_method("other_json_hash") do
      @__other_json_hash_cache ||= (JSON.parse(self.other_json) rescue {})
    end
    cron.group_concat_columns.each do |_group_concat_column|
      define_method("#{_group_concat_column}_values") do
        self.other_json_hash[_group_concat_column.to_s]
      end
    end
  end

  remodel
end