Class: Fluent::MySQLSlowQueryLogOutput

Inherits:
Output
  • Object
show all
Includes:
HandleTagNameMixin
Defined in:
lib/fluent/plugin/out_mysqlslowquerylog.rb

Constant Summary collapse

REGEX1 =
/^#? User\@Host:\s+(\S+)\s+\@\s+(\S+).*/
REGEX2 =
/^# Query_time: ([0-9.]+)\s+Lock_time: ([0-9.]+)\s+Rows_sent: ([0-9.]+)\s+Rows_examined: ([0-9.]+).*/

Instance Method Summary collapse

Instance Method Details

#concat_messages(tag, time, record) ⇒ Object



33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/out_mysqlslowquerylog.rb', line 33

def concat_messages(tag, time, record)
  record.each do |key, value|
    @slowlogs[:"#{tag}"] << value
    if value.end_with?(';') && !value.upcase.start_with?('USE ', 'SET TIMESTAMP=')
      parse_message(tag, time)
    end
  end
end

#configure(conf) ⇒ Object



5
6
7
8
9
10
11
12
# File 'lib/fluent/plugin/out_mysqlslowquerylog.rb', line 5

def configure(conf)
  super
  @slowlogs = {}

  if !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix
    raise ConfigError, "out_myslowquerylog: At least one of option, remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix is required to be set."
  end
end

#emit(tag, es, chain) ⇒ Object



22
23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/out_mysqlslowquerylog.rb', line 22

def emit(tag, es, chain)
  if !@slowlogs[:"#{tag}"].instance_of?(Array)
    @slowlogs[:"#{tag}"] = []
  end
  es.each do |time, record|
    concat_messages(tag, time, record)
  end

  chain.next
end

#flush_emit(tag, time, record) ⇒ Object



76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/out_mysqlslowquerylog.rb', line 76

def flush_emit(tag, time, record)
  @slowlogs[:"#{tag}"].clear
  _tag = tag.clone
  filter_record(_tag, time, record)
  if tag != _tag
    router.emit(_tag, time, record)
  else
    $log.warn "Can not emit message because the tag has not changed. Dropped record #{record}"
  end
end

#parse_message(tag, time) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_mysqlslowquerylog.rb', line 44

def parse_message(tag, time)
  record = {}
  date   = nil

  # Skip the message that is output when after flush-logs or restart mysqld.
  # e.g.) /usr/sbin/mysqld, Version: 5.5.28-0ubuntu0.12.04.2-log ((Ubuntu)). started with:
  begin
    message = @slowlogs[:"#{tag}"].shift
  end while !message.start_with?('#')

  if message.start_with?('# Time: ')
    date    = Time.parse(message[8..-1].strip)
    message = @slowlogs[:"#{tag}"].shift
  end

  message =~ REGEX1
  record['user'] = $1
  record['host'] = $2
  message = @slowlogs[:"#{tag}"].shift

  message =~ REGEX2
  record['query_time']    = $1.to_f
  record['lock_time']     = $2.to_f
  record['rows_sent']     = $3.to_i
  record['rows_examined'] = $4.to_i

  record['sql'] = @slowlogs[:"#{tag}"].map {|m| m.strip}.join(' ')

  time = date.to_i if date
  flush_emit(tag, time, record)
end

#shutdownObject



18
19
20
# File 'lib/fluent/plugin/out_mysqlslowquerylog.rb', line 18

def shutdown
  super
end

#startObject



14
15
16
# File 'lib/fluent/plugin/out_mysqlslowquerylog.rb', line 14

def start
  super
end