Class: Fluent::SQLOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_sql.rb

Defined Under Namespace

Classes: TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSQLOutput

Returns a new instance of SQLOutput.



124
125
126
127
128
# File 'lib/fluent/plugin/out_sql.rb', line 124

def initialize
  super
  require 'active_record'
  require 'activerecord-import'
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



17
18
19
# File 'lib/fluent/plugin/out_sql.rb', line 17

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/fluent/plugin/out_sql.rb', line 130

def configure(conf)
  super

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log)
    te.configure(e)
    if e.arg.empty?
      $log.warn "Detect duplicate default table definition" if @default_table
      @default_table = te
    else
      @tables << te
    end
  }
  @only_default = @tables.empty?

  if @default_table.nil?
    raise ConfigError, "There is no default table. <table> is required in sql output"
  end
end

#emit(tag, es, chain) ⇒ Object



189
190
191
192
193
194
195
# File 'lib/fluent/plugin/out_sql.rb', line 189

def emit(tag, es, chain)
  if @only_default
    super(tag, es, chain)
  else
    super(tag, es, chain, format_tag(tag))
  end
end

#format(tag, time, record) ⇒ Object



197
198
199
# File 'lib/fluent/plugin/out_sql.rb', line 197

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



185
186
187
# File 'lib/fluent/plugin/out_sql.rb', line 185

def shutdown
  super
end

#startObject



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/fluent/plugin/out_sql.rb', line 158

def start
  super

  config = {
    :adapter => @adapter,
    :host => @host,
    :port => @port,
    :database => @database,
    :username => @username,
    :password => @password,
    :socket => @socket,
  }

  @base_model = Class.new(ActiveRecord::Base) do
    self.abstract_class = true
  end

  SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)
  @base_model.establish_connection(config)

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    init_table(te, @base_model)
  end
  init_table(@default_table, @base_model)
end

#write(chunk) ⇒ Object



201
202
203
204
205
206
207
208
# File 'lib/fluent/plugin/out_sql.rb', line 201

def write(chunk)
  @tables.each { |table|
    if table.pattern.match(chunk.key)
      return table.import(chunk)
    end
  }
  @default_table.import(chunk)
end