Method: Fluent::SQLOutput::TableElement#import

Defined in:
lib/fluent/plugin/out_sql.rb

#import(chunk) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_sql.rb', line 70

def import(chunk)
  records = []
  chunk.msgpack_each { |tag, time, data|
    begin
      # format process should be moved to emit / format after supports error stream.
      records << @model.new(@format_proc.call(data))
    rescue => e
      args = {:error => e.message, :error_class => e.class, :table => @table, :record => Yajl.dump(data)}
      @log.warn "Failed to create the model. Ignore a record:", args
    end
  }
  begin
    @model.import(records)
  rescue ActiveRecord::StatementInvalid, ActiveRecord::ThrowResult, ActiveRecord::Import::MissingColumnError => e
    # ignore other exceptions to use Fluentd retry mechanizm
    @log.warn "Got deterministic error. Fallback to one-by-one import", :error => e.message, :error_class => e.class
    one_by_one_import(records)
  end
end