Class: Fluent::MapSupport

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/map_support.rb

Instance Method Summary collapse

Constructor Details

#initialize(map, plugin) ⇒ MapSupport

Returns a new instance of MapSupport.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/fluent/plugin/map_support.rb', line 19

def initialize(map, plugin)
  @map = map
  @plugin = plugin
  if defined?(Fluent::Filter) and plugin.is_a?(Fluent::Filter)
    singleton_class.module_eval("      def map_func(time, record)\n        \#{@map}\n      end\n    CODE\n    class << self\n      alias_method :generate_tuples, :generate_tuples_filter\n      alias_method :do_map, :do_map_filter\n    end\n  elsif plugin.is_a?(Fluent::Output)\n    singleton_class.module_eval(<<-CODE)\n      def map_func(tag, time, record)\n        \#{@map}\n      end\n    CODE\n    class << self\n      alias_method :generate_tuples, :generate_tuples_output\n      alias_method :do_map, :do_map_output\n    end\n  end\nend\n")

Instance Method Details

#do_map(tag, es) ⇒ Object



45
46
47
# File 'lib/fluent/plugin/map_support.rb', line 45

def do_map(tag, es)
  # This method will be overwritten in #initailize.
end

#do_map_filter(tag, es) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/map_support.rb', line 49

def do_map_filter(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#do_map_output(tag, es) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/map_support.rb', line 63

def do_map_output(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |tag, time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#generate_tuplesObject



77
78
79
# File 'lib/fluent/plugin/map_support.rb', line 77

def generate_tuples
  # This method will be overwritten in #initailize.
end

#generate_tuples_filter(tag, es) ⇒ Object



81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/map_support.rb', line 81

def generate_tuples_filter(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end

#generate_tuples_output(tag, es) ⇒ Object



92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/map_support.rb', line 92

def generate_tuples_output(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(tag, time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end

#timeout_blockObject



103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/map_support.rb', line 103

def timeout_block
  begin
    Timeout.timeout(@plugin.timeout){
      yield
    }
  rescue Timeout::Error
    @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
  end
end