Class: Fluent::SQLOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::SQLOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_sql.rb
Defined Under Namespace
Classes: TableElement
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #desc(description) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ SQLOutput
constructor
A new instance of SQLOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ SQLOutput
Returns a new instance of SQLOutput.
150 151 152 153 154 |
# File 'lib/fluent/plugin/out_sql.rb', line 150 def initialize super require 'active_record' require 'activerecord-import' end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
37 38 39 |
# File 'lib/fluent/plugin/out_sql.rb', line 37 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/fluent/plugin/out_sql.rb', line 156 def configure(conf) super if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @tables = [] @default_table = nil conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log, @enable_fallback) te.configure(e) if e.arg.empty? $log.warn "Detect duplicate default table definition" if @default_table @default_table = te else @tables << te end } @only_default = @tables.empty? if @default_table.nil? raise ConfigError, "There is no default table. <table> is required in sql output" end end |
#desc(description) ⇒ Object
13 14 |
# File 'lib/fluent/plugin/out_sql.rb', line 13 def desc(description) end |
#emit(tag, es, chain) ⇒ Object
215 216 217 218 219 220 221 |
# File 'lib/fluent/plugin/out_sql.rb', line 215 def emit(tag, es, chain) if @only_default super(tag, es, chain) else super(tag, es, chain, format_tag(tag)) end end |
#format(tag, time, record) ⇒ Object
223 224 225 |
# File 'lib/fluent/plugin/out_sql.rb', line 223 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
211 212 213 |
# File 'lib/fluent/plugin/out_sql.rb', line 211 def shutdown super end |
#start ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/fluent/plugin/out_sql.rb', line 184 def start super config = { :adapter => @adapter, :host => @host, :port => @port, :database => @database, :username => @username, :password => @password, :socket => @socket, } @base_model = Class.new(ActiveRecord::Base) do self.abstract_class = true end SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model) ActiveRecord::Base.establish_connection(config) # ignore tables if TableElement#init failed @tables.reject! do |te| init_table(te, @base_model) end init_table(@default_table, @base_model) end |
#write(chunk) ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/fluent/plugin/out_sql.rb', line 227 def write(chunk) ActiveRecord::Base.connection_pool.with_connection do @tables.each { |table| if table.pattern.match(chunk.key) return table.import(chunk) end } @default_table.import(chunk) end end |