Class: Fluent::Plugin::SQLOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sql.rb

Defined Under Namespace

Classes: TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSQLOutput

Returns a new instance of SQLOutput.



151
152
153
# File 'lib/fluent/plugin/out_sql.rb', line 151

def initialize
  super
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



41
42
43
# File 'lib/fluent/plugin/out_sql.rb', line 41

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/fluent/plugin/out_sql.rb', line 155

def configure(conf)
  compat_parameters_convert(conf, :inject, :buffer)

  super

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log, @enable_fallback)
    te.configure(e)
    if e.arg.empty?
      $log.warn "Detect duplicate default table definition" if @default_table
      @default_table = te
    else
      @tables << te
    end
  }

  if @pool < @buffer_config.flush_thread_count
    log.warn "connection pool size is smaller than buffer's flush_thread_count. Recommend to increase pool value", :pool => @pool, :flush_thread_count => @buffer_config.flush_thread_count
  end

  if @default_table.nil?
    raise Fluent::ConfigError, "There is no default table. <table> is required in sql output"
  end
end

#formatted_to_msgpack_binaryObject



222
223
224
# File 'lib/fluent/plugin/out_sql.rb', line 222

def formatted_to_msgpack_binary
  true
end

#shutdownObject



218
219
220
# File 'lib/fluent/plugin/out_sql.rb', line 218

def shutdown
  super
end

#startObject



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/fluent/plugin/out_sql.rb', line 188

def start
  super

  config = {
    adapter: @adapter,
    host: @host,
    port: @port,
    database: @database,
    username: @username,
    password: @password,
    socket: @socket,
    schema_search_path: @schema_search_path,
    pool: @pool,
    timeout: @timeout,
  }

  @base_model = Class.new(ActiveRecord::Base) do
    self.abstract_class = true
  end

  SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)
  ActiveRecord::Base.establish_connection(config)

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    init_table(te, @base_model)
  end
  init_table(@default_table, @base_model)
end

#write(chunk) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/fluent/plugin/out_sql.rb', line 226

def write(chunk)
  ActiveRecord::Base.connection_pool.with_connection do

    @tables.each { |table|
      tag = format_tag(chunk..tag)
      if table.pattern.match(tag)
        return table.import(chunk, self)
      end
    }
    @default_table.import(chunk, self)
  end
end