Class: Fluent::SQLOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_sql.rb

Defined Under Namespace

Classes: TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSQLOutput

Returns a new instance of SQLOutput.



150
151
152
153
154
# File 'lib/fluent/plugin/out_sql.rb', line 150

def initialize
  super
  require 'active_record'
  require 'activerecord-import'
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



37
38
39
# File 'lib/fluent/plugin/out_sql.rb', line 37

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/fluent/plugin/out_sql.rb', line 156

def configure(conf)
  super

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log, @enable_fallback)
    te.configure(e)
    if e.arg.empty?
      $log.warn "Detect duplicate default table definition" if @default_table
      @default_table = te
    else
      @tables << te
    end
  }
  @only_default = @tables.empty?

  if @default_table.nil?
    raise ConfigError, "There is no default table. <table> is required in sql output"
  end
end

#desc(description) ⇒ Object



13
14
# File 'lib/fluent/plugin/out_sql.rb', line 13

def desc(description)
end

#emit(tag, es, chain) ⇒ Object



215
216
217
218
219
220
221
# File 'lib/fluent/plugin/out_sql.rb', line 215

def emit(tag, es, chain)
  if @only_default
    super(tag, es, chain)
  else
    super(tag, es, chain, format_tag(tag))
  end
end

#format(tag, time, record) ⇒ Object



223
224
225
# File 'lib/fluent/plugin/out_sql.rb', line 223

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



211
212
213
# File 'lib/fluent/plugin/out_sql.rb', line 211

def shutdown
  super
end

#startObject



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/fluent/plugin/out_sql.rb', line 184

def start
  super

  config = {
    :adapter => @adapter,
    :host => @host,
    :port => @port,
    :database => @database,
    :username => @username,
    :password => @password,
    :socket => @socket,
  }

  @base_model = Class.new(ActiveRecord::Base) do
    self.abstract_class = true
  end

  SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)
  ActiveRecord::Base.establish_connection(config)

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    init_table(te, @base_model)
  end
  init_table(@default_table, @base_model)
end

#write(chunk) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
# File 'lib/fluent/plugin/out_sql.rb', line 227

def write(chunk)
  ActiveRecord::Base.connection_pool.with_connection do

    @tables.each { |table|
      if table.pattern.match(chunk.key)
        return table.import(chunk)
      end
    }
    @default_table.import(chunk)
  end
end