Class: Fluent::MySQLSlowQueryExInput
- Inherits:
-
NewTailInput
- Object
- NewTailInput
- Fluent::MySQLSlowQueryExInput
- Defined in:
- lib/fluent/plugin/in_mysqlslowquery_ex.rb
Instance Method Summary collapse
- #apply_dbname_to_record(parsed_query) ⇒ Object
- #configure(conf) ⇒ Object
- #get_last_dbname ⇒ Object
-
#initialize ⇒ MySQLSlowQueryExInput
constructor
A new instance of MySQLSlowQueryExInput.
- #parser ⇒ Object
- #prepare_lines_to_parse(lines, slow_queries = []) ⇒ Object
- #receive_lines(lines, tail_watcher) ⇒ Object
- #save_last_dbname ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ MySQLSlowQueryExInput
Returns a new instance of MySQLSlowQueryExInput.
7 8 9 10 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 7 def initialize super require 'mysql-slowquery-parser' end |
Instance Method Details
#apply_dbname_to_record(parsed_query) ⇒ Object
101 102 103 104 105 106 107 108 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 101 def apply_dbname_to_record(parsed_query) database_name = parsed_query[:db] || parsed_query[:schema] || @last_dbname_of[@path.to_sym] || @dbname_if_missing_dbname_in_log @last_dbname_of[@path.to_sym] = database_name parsed_query[:database] = database_name parsed_query.delete(:db) parsed_query.delete(:schema) parsed_query end |
#configure(conf) ⇒ Object
12 13 14 15 16 17 18 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 12 def configure(conf) conf['format'] = 'none' super if conf['pos_file'] == @last_dbname_file raise Fluet::ConfigError, '' end end |
#get_last_dbname ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 37 def get_last_dbname return unless @last_dbname_file_handle @last_dbname_file_handle.pos = 0 last_db = @last_dbname_file_handle.read.chomp begin JSON.parse(last_db, symbolize_names: true) rescue JSON::ParserError {} end end |
#parser ⇒ Object
58 59 60 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 58 def parser MySQLSlowQueryParser end |
#prepare_lines_to_parse(lines, slow_queries = []) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 86 def prepare_lines_to_parse(lines, slow_queries = []) @query_unit = [] unless @query_unit while !lines.empty? line = lines.shift @query_unit << line if line.end_with?(';', ";\n") && !line.start_with?('use ', 'SET timestamp=') slow_queries << @query_unit @query_unit = nil prepare_lines_to_parse(lines, slow_queries) break # For when refactoring. Just in case. end end slow_queries end |
#receive_lines(lines, tail_watcher) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 62 def receive_lines(lines, tail_watcher) es = Fluent::MultiEventStream.new prepare_lines_to_parse(lines).each do |query_unit| begin parsed_query_unit = parser.parse_slow_log(query_unit) rescue log.warn %Q{in_mysqlslowquery_ex: parse error: #{$!.}, (#{query_unit.to_s})} next end parsed_query = apply_dbname_to_record(parsed_query_unit) es.add(Time.now.to_i, parsed_query) save_last_dbname() end if !es.empty? begin Fluent::Engine.emit_stream(@tag, es) rescue # ignore errors. Engine shows logs and backtraces. end end end |
#save_last_dbname ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 48 def save_last_dbname return unless @last_dbname_file_handle current = get_last_dbname() unless current == @last_dbname_of @last_dbname_file_handle.pos = 0 @last_dbname_file_handle.truncate(0) @last_dbname_file_handle.write(JSON.generate(current.merge(@last_dbname_of))) end end |
#shutdown ⇒ Object
31 32 33 34 35 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 31 def shutdown save_last_dbname() @last_dbname_file_handle.close if @last_dbname_file_handle super end |
#start ⇒ Object
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 20 def start @last_dbname_of = if @last_dbname_file @last_dbname_file_handle = File.open(@last_dbname_file, File::RDWR|File::CREAT, Fluent::DEFAULT_FILE_PERMISSION) @last_dbname_file_handle.sync = true get_last_dbname() else {} end super end |