Class: ActiveRecord::SqlAnalyzer::CLIProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/active_record/sql_analyzer/cli_processor.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency) ⇒ CLIProcessor

Returns a new instance of CLIProcessor.



6
7
8
9
# File 'lib/active_record/sql_analyzer/cli_processor.rb', line 6

def initialize(concurrency)
  @concurrency = concurrency
  @definitions = {}
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



4
5
6
# File 'lib/active_record/sql_analyzer/cli_processor.rb', line 4

def concurrency
  @concurrency
end

#definitionsObject (readonly)

Returns the value of attribute definitions.



4
5
6
# File 'lib/active_record/sql_analyzer/cli_processor.rb', line 4

def definitions
  @definitions
end

Class Method Details

.process_queue(queue) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/active_record/sql_analyzer/cli_processor.rb', line 11

def self.process_queue(queue)
  local_data = {}

  while !queue.empty? do
    prefix, path = queue.pop
    local_data[prefix] ||= {}

    File.open(path, "r") do |io|
      while !io.eof? do
        yield local_data[prefix], io.readline.strip
      end
    end
  end

  local_data

rescue => ex
  puts "#{ex.class}: #{ex.message}"
  puts ex.backtrace
  raise
end

Instance Method Details

#dump(dest_dir) ⇒ Object



110
111
112
113
114
115
116
117
118
119
# File 'lib/active_record/sql_analyzer/cli_processor.rb', line 110

def dump(dest_dir)
  definitions.each do |prefix, data|
    path = "#{dest_dir}/#{prefix}_#{Time.now.strftime("%Y-%m-%d")}.log"
    puts "Writing logs to '#{path}' (#{data.length} queries)"

    File.open(path, "w+") do |io|
      io.write(data.to_json)
    end
  end
end

#run_definition(logs) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/active_record/sql_analyzer/cli_processor.rb', line 33

def run_definition(logs)
  queue = Queue.new
  logs.each { |l| queue << l }

  # Spin up threads to start processing the queue
  threads = concurrency.times.map do
    Thread.new(queue) do |t_queue|
      # Create a local copy of each definitions then merge them in
      CLIProcessor.process_queue(t_queue) do |local_definitions, line|
        line.strip!

        unless line == ""
          sha, event = line.split("|", 2)
          local_definitions[sha] = JSON.parse(event)
        end
      end
    end
  end

  # Merge everything
  threads.each do |thread|
    thread.value.each do |prefix, data|
      definitions[prefix] ||= {}
      definitions[prefix].merge!(data)
    end
  end
end

#run_usage(logs) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/active_record/sql_analyzer/cli_processor.rb', line 61

def run_usage(logs)
  queue = Queue.new
  logs.each { |l| queue << l }

  # Spin up threads to start processing the queue
  threads = concurrency.times.map do
    Thread.new(queue) do |t_queue|
      # Create a local copy of the usage for each SHA then merge it in at the end
      CLIProcessor.process_queue(t_queue) do |local_usage, line|
        line.strip!

        unless line == ""
          last_called, sha = line.split("|", 2)
          last_called = Time.at(last_called.to_i).utc

          local_usage[sha] ||= {"count" => 0}
          local_usage[sha]["count"] += 1

          if !local_usage[sha]["last_called"] || local_usage[sha]["last_called"] < last_called
            local_usage[sha]["last_called"] = last_called
          end
        end
      end
    end
  end

  # Merge everything
  threads.each do |thread|
    thread.value.each do |prefix, data|
      definitions[prefix] ||= {}

      data.each do |sha, usage|
        definition = definitions[prefix][sha]
        unless definition
          puts "Undefined event '#{sha}'"
          next
        end

        definition["count"] ||= 0
        definition["count"] += usage["count"]

        if !definition["last_called"] || definition["last_called"] < usage["last_called"]
          definition["last_called"] = usage["last_called"]
        end
      end
    end
  end
end