Class: Fluent::MysqlPreparedStatementOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_mysql_prepared_statement.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMysqlPreparedStatementOutput

Returns a new instance of MysqlPreparedStatementOutput.



18
19
20
21
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 18

def initialize
  super
  require 'mysql2-cs-bind'
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



16
17
18
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 16

def handler
  @handler
end

Instance Method Details

#clientObject



58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 58

def client
  Mysql2::Client.new({
      :host => @host,
      :port => @port,
      :username => @username,
      :password => @password,
      :database => @database,
      :flags => Mysql2::Client::MULTI_STATEMENTS
    })
end

#configure(conf) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 23

def configure(conf)
  super

  if @sql.nil?
    raise Fluent::ConfigError, "sql MUST be specified, but missing"
  end

  if @key_names.nil?
    raise Fluent::ConfigError, "key_names MUST be specified, but missing"
  end

  @key_names = @key_names.split(',')
  @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}}

  begin
    Mysql2::Client.pseudo_bind(@sql, @key_names.map{|n| nil})
  rescue ArgumentError => e
    raise Fluent::ConfigError, "mismatch between sql placeholders and key_names"
  end

  $log.info "sql ->[#{@sql}]"
end

#format(tag, time, record) ⇒ Object



54
55
56
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 54

def format(tag, time, record)
  [tag, time, @format_proc.call(tag, time, record)].to_msgpack
end

#get_exec_result(data) ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 81

def get_exec_result(data)
  results = Array.new
  stmt = @handler.xquery(@sql, data)
  return results if stmt.nil?
  stmt.each do |row|
    results.push(row)
  end
  return results
end

#shutdownObject



50
51
52
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 50

def shutdown
  super
end

#startObject



46
47
48
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 46

def start
  super
end

#write(chunk) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_mysql_prepared_statement.rb', line 69

def write(chunk)
  @handler = client
  $log.info "adding mysql_query job: "
  chunk.msgpack_each { |tag, time, data|
    results = get_exec_result(data)
    results.each{|result|
      Fluent::Engine.emit(@output_tag, Fluent::Engine.now, result)
    }
  }
  @handler.close
end