Class: Fluent::MapOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_map.rb

Constant Summary collapse

MMAP_MAX_NUM =
50

Instance Method Summary collapse

Instance Method Details

#check_mmap_range(conf) ⇒ Object



92
93
94
95
96
97
98
99
100
# File 'lib/fluent/plugin/out_map.rb', line 92

def check_mmap_range(conf)
  invalid_mmap = conf.keys.select { |k|
    m = k.match(/^mmap(\d+)$/)
    m ? !((1..MMAP_MAX_NUM).include?(m[1].to_i)) : false
  }
  unless invalid_mmap.empty?
    raise ConfigError, "Invalid mmapN found. N should be 1 - #{MMAP_MAX_NUM}: " + invalid_mmap.join(",")
  end
end

#configure(conf) ⇒ Object



22
23
24
25
26
27
# File 'lib/fluent/plugin/out_map.rb', line 22

def configure(conf)
  super
  @format = determine_format()
  configure_format()
  @map = create_map(conf)
end

#configure_formatObject



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin/out_map.rb', line 41

def configure_format()
  case @format
  when "map"
    # pass
  when "record"
    @tag ||= @key
    raise ConfigError, "multi and 3 parameters(tag, time, and record) are not compatible" if @multi
  when "multimap"
    # pass.
  else
    raise ConfigError, "format #{@format} is invalid."
  end
end

#create_map(conf) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_map.rb', line 55

def create_map(conf)
  # return string like double array.
  case @format
  when "map"
    parse_map()
  when "record"
    "[[#{@tag}, #{@time}, #{@record}]]"
  when "multimap"
    parse_multimap(conf)
  end
end

#determine_formatObject



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin/out_map.rb', line 29

def determine_format()
  if @format
    @format
  elsif @map
    "map"
  elsif (@tag || @key) && @time && @record
    "record"
  else
    raise ConfigError, "Any of map, 3 parameters(tag, time, and record) or format is required "
  end
end

#do_map(tag, es) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/out_map.rb', line 118

def do_map(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = MultiEventStream::new}
  tuples.each do |tag, time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    $log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#emit(tag, es, chain) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/out_map.rb', line 103

def emit(tag, es, chain)
  begin
    tag_output_es = do_map(tag, es)
    tag_output_es.each_pair do |tag, output_es|
      router.emit_stream(tag, output_es)
    end
    chain.next
    tag_output_es
  rescue SyntaxError => e
    chain.next
    $log.error "map command is syntax error: #{@map}"
    e #for test
  end
end

#generate_tuples(tag, es) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/out_map.rb', line 132

def generate_tuples(tag, es)
  tuples = []
  es.each {|time, record|
    new_tuple = eval(@map)
    tuples.concat new_tuple
  }
  tuples
end

#parse_mapObject



67
68
69
70
71
72
73
# File 'lib/fluent/plugin/out_map.rb', line 67

def parse_map()
  if @multi
    @map
  else
    "[#{@map}]"
  end
end

#parse_multimap(conf) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/out_map.rb', line 75

def parse_multimap(conf)
  check_mmap_range(conf)

  prev_mmap = nil
  result_mmaps = (1..MMAP_MAX_NUM).map { |i|
    mmap = conf["mmap#{i}"]
    if (i > 1) && prev_mmap.nil? && !mmap.nil?
      raise ConfigError, "Jump of mmap index found. mmap#{i - 1} is missing."
    end
    prev_mmap = mmap
    next if mmap.nil?

    mmap
  }.compact.join(',')
  "[#{result_mmaps}]"
end

#timeout_block(tag, time, record) ⇒ Object



141
142
143
144
145
146
147
148
149
# File 'lib/fluent/plugin/out_map.rb', line 141

def timeout_block(tag, time, record)
  begin
    Timeout.timeout(@timeout){
      yield
    }
  rescue Timeout::Error
    $log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
  end
end