Class: Fluent::Plugin::MapSupport

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/map_support.rb

Instance Method Summary collapse

Constructor Details

#initialize(map, plugin) ⇒ MapSupport



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/map_support.rb', line 22

def initialize(map, plugin)
  @map = map
  @plugin = plugin
  @checker = Fluent::Plugin::Parser::TimeoutChecker.new(@plugin.timeout)
  @checker.start

  if plugin.is_a?(Fluent::Plugin::Filter)
    singleton_class.module_eval(<<-CODE)
    def map_func(time, record)
      #{@map}
    end
  CODE
    class << self
      alias_method :generate_tuples, :generate_tuples_filter
      alias_method :do_map, :do_map_filter
    end
  elsif plugin.is_a?(Fluent::Plugin::Output)
    singleton_class.module_eval(<<-CODE)
    def map_func(tag, time, record)
      #{@map}
    end
  CODE
    class << self
      alias_method :generate_tuples, :generate_tuples_output
      alias_method :do_map, :do_map_output
    end
  end
end

Instance Method Details

#do_map(tag, es) ⇒ Object



51
52
53
# File 'lib/fluent/plugin/map_support.rb', line 51

def do_map(tag, es)
  # This method will be overwritten in #initailize.
end

#do_map_filter(tag, es) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/map_support.rb', line 55

def do_map_filter(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#do_map_output(tag, es) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/map_support.rb', line 69

def do_map_output(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |tag, time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#generate_tuplesObject



83
84
85
# File 'lib/fluent/plugin/map_support.rb', line 83

def generate_tuples
  # This method will be overwritten in #initailize.
end

#generate_tuples_filter(tag, es) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/map_support.rb', line 87

def generate_tuples_filter(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end

#generate_tuples_output(tag, es) ⇒ Object



98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/map_support.rb', line 98

def generate_tuples_output(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(tag, time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end

#stopObject



119
120
121
# File 'lib/fluent/plugin/map_support.rb', line 119

def stop
  @checker.stop
end

#timeout_blockObject



109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/map_support.rb', line 109

def timeout_block
  begin
    @checker.execute {
      yield
    }
  rescue Timeout::Error
    @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
  end
end