Class: Fluent::SQLOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::SQLOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_sql.rb
Defined Under Namespace
Classes: TableElement
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ SQLOutput
constructor
A new instance of SQLOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ SQLOutput
Returns a new instance of SQLOutput.
124 125 126 127 128 |
# File 'lib/fluent/plugin/out_sql.rb', line 124 def initialize super require 'active_record' require 'activerecord-import' end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
17 18 19 |
# File 'lib/fluent/plugin/out_sql.rb', line 17 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/fluent/plugin/out_sql.rb', line 130 def configure(conf) super if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @tables = [] @default_table = nil conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log) te.configure(e) if e.arg.empty? $log.warn "Detect duplicate default table definition" if @default_table @default_table = te else @tables << te end } @only_default = @tables.empty? if @default_table.nil? raise ConfigError, "There is no default table. <table> is required in sql output" end end |
#emit(tag, es, chain) ⇒ Object
189 190 191 192 193 194 195 |
# File 'lib/fluent/plugin/out_sql.rb', line 189 def emit(tag, es, chain) if @only_default super(tag, es, chain) else super(tag, es, chain, format_tag(tag)) end end |
#format(tag, time, record) ⇒ Object
197 198 199 |
# File 'lib/fluent/plugin/out_sql.rb', line 197 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
185 186 187 |
# File 'lib/fluent/plugin/out_sql.rb', line 185 def shutdown super end |
#start ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/fluent/plugin/out_sql.rb', line 158 def start super config = { :adapter => @adapter, :host => @host, :port => @port, :database => @database, :username => @username, :password => @password, :socket => @socket, } @base_model = Class.new(ActiveRecord::Base) do self.abstract_class = true end SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model) @base_model.establish_connection(config) # ignore tables if TableElement#init failed @tables.reject! do |te| init_table(te, @base_model) end init_table(@default_table, @base_model) end |
#write(chunk) ⇒ Object
201 202 203 204 205 206 207 208 |
# File 'lib/fluent/plugin/out_sql.rb', line 201 def write(chunk) @tables.each { |table| if table.pattern.match(chunk.key) return table.import(chunk) end } @default_table.import(chunk) end |