Class: Fluent::Plugin::SQLOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sql.rb

Defined Under Namespace

Classes: TableElement

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSQLOutput



145
146
147
148
149
# File 'lib/fluent/plugin/out_sql.rb', line 145

def initialize
  super
  require 'active_record'
  require 'activerecord-import'
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



36
37
38
# File 'lib/fluent/plugin/out_sql.rb', line 36

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fluent/plugin/out_sql.rb', line 151

def configure(conf)
  compat_parameters_convert(conf, :inject, :buffer)

  super

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log, @enable_fallback)
    te.configure(e)
    if e.arg.empty?
      $log.warn "Detect duplicate default table definition" if @default_table
      @default_table = te
    else
      @tables << te
    end
  }
  @only_default = @tables.empty?

  if @default_table.nil?
    raise Fluent::ConfigError, "There is no default table. <table> is required in sql output"
  end
end

#emit(tag, es, chain) ⇒ Object



213
214
215
216
217
218
219
# File 'lib/fluent/plugin/out_sql.rb', line 213

def emit(tag, es, chain)
  if @only_default
    super(tag, es, chain)
  else
    super(tag, es, chain, format_tag(tag))
  end
end

#format(tag, time, record) ⇒ Object



221
222
223
224
# File 'lib/fluent/plugin/out_sql.rb', line 221

def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



226
227
228
# File 'lib/fluent/plugin/out_sql.rb', line 226

def formatted_to_msgpack_binary
  true
end

#shutdownObject



209
210
211
# File 'lib/fluent/plugin/out_sql.rb', line 209

def shutdown
  super
end

#startObject



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/fluent/plugin/out_sql.rb', line 181

def start
  super

  config = {
    adapter: @adapter,
    host: @host,
    port: @port,
    database: @database,
    username: @username,
    password: @password,
    socket: @socket,
    schema_search_path: @schema_search_path,
  }

  @base_model = Class.new(ActiveRecord::Base) do
    self.abstract_class = true
  end

  SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)
  ActiveRecord::Base.establish_connection(config)

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    init_table(te, @base_model)
  end
  init_table(@default_table, @base_model)
end

#write(chunk) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
# File 'lib/fluent/plugin/out_sql.rb', line 230

def write(chunk)
  ActiveRecord::Base.connection_pool.with_connection do

    @tables.each { |table|
      if table.pattern.match(chunk.key)
        return table.import(chunk)
      end
    }
    @default_table.import(chunk)
  end
end