Class: Fluent::Plugin::SQLOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::SQLOutput
- Defined in:
- lib/fluent/plugin/out_sql.rb
Defined Under Namespace
Classes: TableElement
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
-
#initialize ⇒ SQLOutput
constructor
A new instance of SQLOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ SQLOutput
145 146 147 148 149 |
# File 'lib/fluent/plugin/out_sql.rb', line 145 def initialize super require 'active_record' require 'activerecord-import' end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
36 37 38 |
# File 'lib/fluent/plugin/out_sql.rb', line 36 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/fluent/plugin/out_sql.rb', line 151 def configure(conf) compat_parameters_convert(conf, :inject, :buffer) super if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @tables = [] @default_table = nil conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log, @enable_fallback) te.configure(e) if e.arg.empty? $log.warn "Detect duplicate default table definition" if @default_table @default_table = te else @tables << te end } @only_default = @tables.empty? if @default_table.nil? raise Fluent::ConfigError, "There is no default table. <table> is required in sql output" end end |
#emit(tag, es, chain) ⇒ Object
213 214 215 216 217 218 219 |
# File 'lib/fluent/plugin/out_sql.rb', line 213 def emit(tag, es, chain) if @only_default super(tag, es, chain) else super(tag, es, chain, format_tag(tag)) end end |
#format(tag, time, record) ⇒ Object
221 222 223 224 |
# File 'lib/fluent/plugin/out_sql.rb', line 221 def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end |
#formatted_to_msgpack_binary ⇒ Object
226 227 228 |
# File 'lib/fluent/plugin/out_sql.rb', line 226 def formatted_to_msgpack_binary true end |
#shutdown ⇒ Object
209 210 211 |
# File 'lib/fluent/plugin/out_sql.rb', line 209 def shutdown super end |
#start ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/fluent/plugin/out_sql.rb', line 181 def start super config = { adapter: @adapter, host: @host, port: @port, database: @database, username: @username, password: @password, socket: @socket, schema_search_path: @schema_search_path, } @base_model = Class.new(ActiveRecord::Base) do self.abstract_class = true end SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model) ActiveRecord::Base.establish_connection(config) # ignore tables if TableElement#init failed @tables.reject! do |te| init_table(te, @base_model) end init_table(@default_table, @base_model) end |
#write(chunk) ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/fluent/plugin/out_sql.rb', line 230 def write(chunk) ActiveRecord::Base.connection_pool.with_connection do @tables.each { |table| if table.pattern.match(chunk.key) return table.import(chunk) end } @default_table.import(chunk) end end |