Class: Fluent::Plugin::SQLOutput::TableElement

Inherits:
Object
  • Object
show all
Includes:
Configurable
Defined in:
lib/fluent/plugin/out_sql.rb

Overview

TODO: Merge SQLInput’s TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pattern, log, enable_fallback) ⇒ TableElement

Returns a new instance of TableElement.



54
55
56
57
58
59
# File 'lib/fluent/plugin/out_sql.rb', line 54

def initialize(pattern, log, enable_fallback)
  super()
  @pattern = Fluent::MatchPattern.create(pattern)
  @log = log
  @enable_fallback = enable_fallback
end

Instance Attribute Details

#modelObject (readonly)

Returns the value of attribute model.



51
52
53
# File 'lib/fluent/plugin/out_sql.rb', line 51

def model
  @model
end

#patternObject (readonly)

Returns the value of attribute pattern.



52
53
54
# File 'lib/fluent/plugin/out_sql.rb', line 52

def pattern
  @pattern
end

Instance Method Details

#configure(conf) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/out_sql.rb', line 61

def configure(conf)
  super

  @mapping = parse_column_mapping(@column_mapping)
  @format_proc = Proc.new { |record|
    new_record = {}
    @mapping.each { |k, c|
      new_record[c] = record[k]
    }
    new_record
  }
end

#import(chunk, output) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/fluent/plugin/out_sql.rb', line 91

def import(chunk, output)
  tag = chunk..tag
  records = []
  chunk.msgpack_each { |time, data|
    begin
      data = output.inject_values_to_record(tag, time, data)
      records << @model.new(@format_proc.call(data))
    rescue => e
      args = {error: e, table: @table, record: Yajl.dump(data)}
      @log.warn "Failed to create the model. Ignore a record:", args
    end
  }
  begin
    @model.import(records)
  rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
    if @enable_fallback
      # ignore other exceptions to use Fluentd retry mechanizm
      @log.warn "Got deterministic error. Fallback to one-by-one import", error: e
      one_by_one_import(records)
    else
      @log.warn "Got deterministic error. Fallback is disabled", error: e
      raise e
    end
  end
end

#init(base_model) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/out_sql.rb', line 74

def init(base_model)
  # See SQLInput for more details of following code
  table_name = @table
  @model = Class.new(base_model) do
    self.table_name = table_name
    self.inheritance_column = '_never_use_output_'
  end

  class_name = table_name.singularize.camelize
  base_model.const_set(class_name, @model)
  model_name = ActiveModel::Name.new(@model, nil, class_name)
  @model.define_singleton_method(:model_name) { model_name }

  # TODO: check column_names and table schema
  # @model.column_names
end

#one_by_one_import(records) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/out_sql.rb', line 117

def one_by_one_import(records)
  records.each { |record|
    retries = 0
    begin
      @model.import([record])
    rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
      @log.error "Got deterministic error again. Dump a record", error: e, record: record
    rescue => e
      retries += 1
      if retries > @num_retries
        @log.error "Can't recover undeterministic error. Dump a record", error: e, record: record
        next
      end

      @log.warn "Failed to import a record: retry number = #{retries}", error: e
      sleep 0.5
      retry
    end
  }
end