Class: MssqlOutput

Inherits:
Fluent::BufferedOutput
  • Object
show all
Includes:
Fluent::SetTagKeyMixin, Fluent::SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_mssql.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMssqlOutput

Returns a new instance of MssqlOutput.



20
21
22
23
# File 'lib/fluent/plugin/out_mssql.rb', line 20

def initialize
  super
  require 'dbi'
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



18
19
20
# File 'lib/fluent/plugin/out_mssql.rb', line 18

def handler
  @handler
end

Instance Method Details

#clientObject



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/out_mssql.rb', line 67

def client
  if @odbc_label.nil?
    raise Fluent::ConfigError, "odbc_label MUST be specified, but missing"
  else
    begin
      dbh = DBI.connect("dbi:ODBC:#{@odbc_label}", @username, @password)
    rescue
      raise Fluent::ConfigError, "Cannot open database, check user or password"
    end
  end
  dbh
end

#configure(conf) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_mssql.rb', line 25

def configure(conf)
  super

  if @format == 'json'
    @format_proc = Proc.new{|tag, time, record| record.to_json}
  else
    @key_names = @key_names.split(',')
    @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}}
  end

  if @columns.nil? and @sql.nil?
    raise Fluent::ConfigError, "columns or sql MUST be specified, but missing"
  end

  if @sql.nil?
    raise Fluent::ConfigError, "table missing" unless @table

    @columns = @columns.split(',')
    cols = @columns.join(',')
    placeholders = if @format == 'json'
                     '?'
                   else
                     @key_names.map{|k| '?'}.join(',')
                   end
    @sql = "INSERT INTO #{@table} (#{cols}) VALUES (#{placeholders})"
  end
end

#format(tag, time, record) ⇒ Object



61
62
63
64
65
# File 'lib/fluent/plugin/out_mssql.rb', line 61

def format(tag, time, record)
  tmp = [tag, time, @format_proc.call(tag, time, record)]
  mp = tmp.to_msgpack
  mp
end

#shutdownObject



57
58
59
# File 'lib/fluent/plugin/out_mssql.rb', line 57

def shutdown
  super
end

#startObject



53
54
55
# File 'lib/fluent/plugin/out_mssql.rb', line 53

def start
  super
end

#write(chunk) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_mssql.rb', line 80

def write(chunk)
  handler = self.client
  chunk.msgpack_each { |tag, time, data|
    begin
      query = handler.prepare(@sql)
      num = 1
      data.each { |d|
        query.bind_param(num, d)
        num += 1
      }
      query.execute
    rescue
      raise Fluent::ConfigError, "SQL Execute Error #{@sql} - #{data}"
    end
  }
  handler.disconnect
end