Class: Fluent::Plugin::SQLOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sql.rb

Defined Under Namespace

Classes: TableElement

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSQLOutput

Returns a new instance of SQLOutput.



143
144
145
146
147
# File 'lib/fluent/plugin/out_sql.rb', line 143

def initialize
  super
  require 'active_record'
  require 'activerecord-import'
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



34
35
36
# File 'lib/fluent/plugin/out_sql.rb', line 34

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/fluent/plugin/out_sql.rb', line 149

def configure(conf)
  compat_parameters_convert(conf, :inject, :buffer)

  super

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log, @enable_fallback)
    te.configure(e)
    if e.arg.empty?
      $log.warn "Detect duplicate default table definition" if @default_table
      @default_table = te
    else
      @tables << te
    end
  }
  @only_default = @tables.empty?

  if @default_table.nil?
    raise Fluent::ConfigError, "There is no default table. <table> is required in sql output"
  end
end

#emit(tag, es, chain) ⇒ Object



210
211
212
213
214
215
216
# File 'lib/fluent/plugin/out_sql.rb', line 210

def emit(tag, es, chain)
  if @only_default
    super(tag, es, chain)
  else
    super(tag, es, chain, format_tag(tag))
  end
end

#format(tag, time, record) ⇒ Object



218
219
220
221
# File 'lib/fluent/plugin/out_sql.rb', line 218

def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



223
224
225
# File 'lib/fluent/plugin/out_sql.rb', line 223

def formatted_to_msgpack_binary
  true
end

#shutdownObject



206
207
208
# File 'lib/fluent/plugin/out_sql.rb', line 206

def shutdown
  super
end

#startObject



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/fluent/plugin/out_sql.rb', line 179

def start
  super

  config = {
    adapter: @adapter,
    host: @host,
    port: @port,
    database: @database,
    username: @username,
    password: @password,
    socket: @socket,
  }

  @base_model = Class.new(ActiveRecord::Base) do
    self.abstract_class = true
  end

  SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)
  ActiveRecord::Base.establish_connection(config)

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    init_table(te, @base_model)
  end
  init_table(@default_table, @base_model)
end

#write(chunk) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
# File 'lib/fluent/plugin/out_sql.rb', line 227

def write(chunk)
  ActiveRecord::Base.connection_pool.with_connection do

    @tables.each { |table|
      if table.pattern.match(chunk.key)
        return table.import(chunk)
      end
    }
    @default_table.import(chunk)
  end
end