Class: Fluent::JsonSchemaFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_json_schema.rb

Instance Method Summary collapse

Constructor Details

#initializeJsonSchemaFilter

Returns a new instance of JsonSchemaFilter.



19
20
21
22
# File 'lib/fluent/plugin/filter_json_schema.rb', line 19

def initialize
  super
  require "json-schema"
end

Instance Method Details

#configure(conf) ⇒ Object



32
33
34
# File 'lib/fluent/plugin/filter_json_schema.rb', line 32

def configure(conf)
  super
end

#filter_stream(tag, es) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/filter_json_schema.rb', line 36

def filter_stream(tag, es)
  new_es = MultiEventStream.new
  es.each { |time, record|
    new_record = record.dup
    begin
      valid, error_msg = validate(record)
      puts valid
      puts error_msg

      if not valid
        new_record['validation-error']=error_msg if @add_validation_error

        case @mode
        when :discard
          next 
        when :enrich
          @enrich_invalid.each_pair{|k,v| new_record[k]=v}
        when :isolate
          router.emit("#{@isolate_tag_prefix}.#{tag}", time, new_record)
          next 
        end
      else
        case @mode
        when :enrich
          @enrich_valid.each_pair{|k,v| new_record[k]=v}
        end
      end
      
      new_es.add(time, new_record)
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  }
  new_es
end

#shutdownObject



28
29
30
# File 'lib/fluent/plugin/filter_json_schema.rb', line 28

def shutdown
  super
end

#startObject



24
25
26
# File 'lib/fluent/plugin/filter_json_schema.rb', line 24

def start
  super
end