Class: Fluent::Plugin::MapSupport

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/map_support.rb

Instance Method Summary collapse

Constructor Details

#initialize(map, plugin) ⇒ MapSupport

Returns a new instance of MapSupport.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/map_support.rb', line 20

def initialize(map, plugin)
  @map = map
  @plugin = plugin
  if plugin.is_a?(Fluent::Plugin::Filter)
    singleton_class.module_eval(<<-CODE)
    def map_func(time, record)
      #{@map}
    end
  CODE
    class << self
      alias_method :generate_tuples, :generate_tuples_filter
      alias_method :do_map, :do_map_filter
    end
  elsif plugin.is_a?(Fluent::Plugin::Output)
    singleton_class.module_eval(<<-CODE)
    def map_func(tag, time, record)
      #{@map}
    end
  CODE
    class << self
      alias_method :generate_tuples, :generate_tuples_output
      alias_method :do_map, :do_map_output
    end
  end
end

Instance Method Details

#do_map(tag, es) ⇒ Object



46
47
48
# File 'lib/fluent/plugin/map_support.rb', line 46

def do_map(tag, es)
  # This method will be overwritten in #initailize.
end

#do_map_filter(tag, es) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/map_support.rb', line 50

def do_map_filter(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#do_map_output(tag, es) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/map_support.rb', line 64

def do_map_output(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |tag, time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#generate_tuplesObject



78
79
80
# File 'lib/fluent/plugin/map_support.rb', line 78

def generate_tuples
  # This method will be overwritten in #initailize.
end

#generate_tuples_filter(tag, es) ⇒ Object



82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/map_support.rb', line 82

def generate_tuples_filter(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end

#generate_tuples_output(tag, es) ⇒ Object



93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/map_support.rb', line 93

def generate_tuples_output(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(tag, time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end

#timeout_blockObject



104
105
106
107
108
109
110
111
112
# File 'lib/fluent/plugin/map_support.rb', line 104

def timeout_block
  begin
    Timeout.timeout(@plugin.timeout){
      yield
    }
  rescue Timeout::Error
    @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
  end
end