Class: HadoopDsl::LogAnalysis::LogAnalysisMapper

Inherits:
BaseMapper show all
Defined in:
lib/log_analysis.rb

Overview

controller

Constant Summary collapse

@@reg_cache =
{}

Instance Attribute Summary

Attributes inherited from BaseMapRed

#emitted

Instance Method Summary collapse

Methods inherited from BaseMapper

#identity

Methods inherited from BaseMapRed

#emit

Methods included from DslController

#pre_process, #run

Methods included from DslElement

#method_missing

Constructor Details

#initialize(script, key, value) ⇒ LogAnalysisMapper

Returns a new instance of LogAnalysisMapper.



14
15
16
# File 'lib/log_analysis.rb', line 14

def initialize(script, key, value)
  super(script, LogAnalysisMapperModel.new(key, value))
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class HadoopDsl::DslElement

Instance Method Details

#column_name(*names) ⇒ Object

column names by String converted to Symbol



46
47
48
49
# File 'lib/log_analysis.rb', line 46

def column_name(*names)
  sym_names = names.map {|name| name.is_a?(String) ? name.to_sym : name }
  @model.create_or_replace_columns_with(sym_names) {|column, name| column.name = name}
end

#count_uniq(column_or_value) ⇒ Object

emitters



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/log_analysis.rb', line 74

def count_uniq(column_or_value)
  uniq_key =
    case column_or_value
    when LogAnalysisMapperModel::Column
      column = column_or_value
      column.value
    else column_or_value # value
    end
  current_topic.key_elements << uniq_key
  emit(current_topic.key => 1)
end

#group_by(column_or_value) ⇒ Object



51
52
53
54
55
56
57
58
59
60
# File 'lib/log_analysis.rb', line 51

def group_by(column_or_value)
  case column_or_value
  when LogAnalysisMapperModel::Column
    column = column_or_value
    current_topic.key_elements << column.value
  else
    value = column_or_value
    current_topic.key_elements << value
  end
end

#group_date_by(column, term) ⇒ Object



62
63
64
65
66
67
68
69
70
71
# File 'lib/log_analysis.rb', line 62

def group_date_by(column, term)
  require 'time'
  time = parse_time(column.value)
  time_key = case term
             when :daily then time.strftime('%Y%m%d') 
             when :monthly then time.strftime('%Y%m') 
             when :yearly then time.strftime('%Y') 
             end
  current_topic.key_elements << time_key
end

#pattern(reg_str) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/log_analysis.rb', line 32

def pattern(reg_str)
  # try to get RE from cache
  cached = @@reg_cache[reg_str] 
  re = cached ? @@reg_cache[reg_str] : Regexp.new(reg_str)
  @@reg_cache[reg_str] ||= re # new cache

  if value =~ re
    md = Regexp.last_match
    @model.create_or_replace_columns_with(md.captures) {|column, value| column.value = value}
  else throw :each_line # non-local exit
  end
end

#separate(sep) ⇒ Object



27
28
29
30
# File 'lib/log_analysis.rb', line 27

def separate(sep)
  parts = value.split(sep)
  @model.create_or_replace_columns_with(parts) {|column, value| column.value = value}
end

#sum(column) ⇒ Object



86
87
88
# File 'lib/log_analysis.rb', line 86

def sum(column)
  emit(current_topic.key => column.value.to_i)
end

#topic(desc, options = {}, &block) ⇒ Object



21
22
23
24
25
# File 'lib/log_analysis.rb', line 21

def topic(desc, options = {}, &block)
  @model.create_topic(desc, options)
  yield if block_given?
  current_topic
end