Class: GRI::AppCollector

Inherits:
Object show all
Defined in:
lib/gri/pcollector.rb,
lib/gri/app_collector.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ AppCollector

Returns a new instance of AppCollector.



12
13
14
15
16
# File 'lib/gri/app_collector.rb', line 12

def initialize config
  @config = config
  @writers = []
  @metrics = Hash.new 0
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



10
11
12
# File 'lib/gri/app_collector.rb', line 10

def config
  @config
end

#metricsObject (readonly)

Returns the value of attribute metrics.



10
11
12
# File 'lib/gri/app_collector.rb', line 10

def metrics
  @metrics
end

#writersObject (readonly)

Returns the value of attribute writers.



10
11
12
# File 'lib/gri/app_collector.rb', line 10

def writers
  @writers
end

Instance Method Details

#fillup_queue(n, sock_path, targets, scheduler) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/gri/pcollector.rb', line 128

def fillup_queue n, sock_path, targets, scheduler
  e = false
  mqs = get_max_queue_size
  while scheduler.queue.size < mqs
    begin
      unless File.socket? sock_path
        e = true
        break
      end
      sock = UNIXSocket.new sock_path
    rescue Errno::ECONNREFUSED
      sock.close rescue nil
      sleep(0.1 + rand)
      retry
    rescue SystemCallError
      sock.close rescue nil
      e = true
      break
    end
    begin
      unless (line = sock.gets)
        e = true
        break
      end
    rescue
      e = true
      break
    ensure
      sock.close
    end
    num, host = line.split
    scheduler.queue.push targets[num.to_i]
    scheduler.process_queue
  end
  e
end

#fork_child(server_sock, sock_path, nproc, targets, log_dir, scheduler_class, fdh) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/gri/pcollector.rb', line 165

def fork_child server_sock, sock_path, nproc, targets, log_dir,
    scheduler_class, fdh
  pids = []
  ppid = $$
  for n in 1..nproc
    pid = fork {
      server_sock.close
      start_time = Time.now
      sleep 0.05 * n
      Log.debug "child ##{n}"
      loop = Loop.new
      @writers.each {|writer| writer.loop = loop}
      scheduler = scheduler_class.new loop, @metrics
      scheduler.queue = []
      scheduler.writers = @writers
      scheduler.fake_descr_hash = fdh

      e = fillup_queue n, sock_path, targets, scheduler
      scheduler.process_queue
      if !e or loop.has_active_watchers?
        while true
          loop.run_once
          break if e and !loop.has_active_watchers?
          e = fillup_queue n, sock_path, targets, scheduler
          scheduler.process_queue
        end
      end
      scheduler.finalize
      rc = @metrics[:run_count]
      elapsed = Time.now - start_time
      Log.debug "end ##{n} #{rc} #{rc/elapsed}"
      #@metrics["run_count#{n}".intern] = rc
      begin
        path = "#{log_dir}/res.#{ppid}.#{$$}.dump"
        Marshal.dump_to_file @metrics, path
      rescue SystemCallError
        Log.error "#{$!}"
      end
    }
    pids.push pid
  end
  pids
end

#get_max_queue_sizeObject



124
125
126
# File 'lib/gri/pcollector.rb', line 124

def get_max_queue_size
  4
end

#get_ptargets(targets, basetime, duration, offset = 0, default_interval = 300) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/gri/pcollector.rb', line 96

def get_ptargets targets, basetime, duration, offset=0, default_interval=300
  if duration.zero?
    ptargets = {basetime=>(0..targets.size-1).to_a}
  else
    intervals = {}
    n = 0
    for host, options in targets
      interval = (options['interval'] || default_interval).to_i
      next if interval.zero?
      (intervals[interval] ||= []).push n
      n += 1
    end
    ptargets = {}
    et = basetime + duration
    for interval in intervals.keys
      st = basetime - basetime % interval
      (0..duration/interval).each {|n|
        s = n * interval
        if (t = st + s) >= basetime and t < et
          ptargets[t+offset] ||= []
          ptargets[t+offset] += intervals[interval]
        end
      }
    end
  end
  ptargets
end

#get_targets_from_lines(lines, config) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/gri/app_collector.rb', line 121

def get_targets_from_lines lines, config
  targets = Config.get_targets_from_lines lines
  goptions = Config.parse_options(*(config.getvar 'option'))
  goptions.merge!(Config.parse_options(*config.getvar('O')))
  if config['host-pat']
    hosts_re = config.getvar('host-pat').map {|h| Regexp.new h}
    targets = targets.select {|host, | hosts_re.detect {|re| re === host}}
    #re = Regexp.new config['host-pat']
    #targets = targets.select {|host, options| re === host}
  end
  for host, options in targets
    hoptions = goptions.clone
    hoptions.merge! Config.option_if_match(host, 'option-if-host', config)
    hoptions.merge! options
    options.replace hoptions
  end
  targets
end

#get_tra_uri(tra_str) ⇒ Object



94
95
96
97
98
99
100
101
# File 'lib/gri/app_collector.rb', line 94

def get_tra_uri tra_str
  if tra_str =~ /\A[-\w\.]+(:\d+)\z/
    tra_str = "http://#{tra_str}/"
  elsif tra_str =~ /\A[-\w\.]+\z/
    tra_str = "http://#{tra_str}:7080/"
  end
  uri = URI.parse tra_str rescue nil
end

#load_fake_descr_files(files) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/gri/app_collector.rb', line 76

def load_fake_descr_files files
  h = {}
  for path in files
    if File.exist? path
      open(path) {|f|
        while line = f.gets
          if line =~ /\A([-\.\dA-Za-z]+_\S+)\s+(.*)/
            descr = $2
            host, key = $1.split(/_/, 2)
            (h[host] ||= {})[key] = descr
          end
        end
      }
    end
  end
  h
end

#load_target_lines(config) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/gri/app_collector.rb', line 103

def load_target_lines config
  if config['updater'] and (tra_str = config['tra']) and
      !config['gritab-path']
    tra_uri = get_tra_uri tra_str
    lines = RemoteLDB.get_gritab_lines tra_uri
  else
    root_dir = config['root-dir'] ||= Config::ROOT_PATH
    gritab_path = config['gritab-path'] || root_dir + '/gritab'
    lines = []
    File.open(gritab_path) {|f|
      while line = f.gets
        lines.push line
      end
    }
  end
  lines
end

#parse_host_key(s) ⇒ Object



72
73
74
# File 'lib/gri/app_collector.rb', line 72

def parse_host_key s
  s.to_s.scan(/\A([-\.A-Za-z0-9]+)_([^_\d]*)(?:_?(.*))/).first
end

#runObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/gri/app_collector.rb', line 18

def run
  start_time = Time.now
  root_dir = config['root-dir'] ||= Config::ROOT_PATH

  lines = load_target_lines config
  targets = get_targets_from_lines lines, config

  files = Config.getvar 'fake-descr-file'
  fdh = load_fake_descr_files files if files

  if config['updater']
    if (tra_str = config['tra'])
      tra_uri = get_tra_uri tra_str
      TraCollector.tra_uri = tra_uri
      TraCollector.db_class = RemoteLDB
    else
      tra_dir = config['tra-dir'] || root_dir + '/tra'
      TraCollector.tra_dir = tra_dir
      TraCollector.db_class = LocalLDB
    end

    gra_dir = config['gra-dir'] || root_dir + '/gra'
    Dir.mkdir gra_dir unless File.directory? gra_dir
    TraCollector.gra_dir = gra_dir
    scheduler_class = UScheduler
    h = {}
    targets.each {|ary|
      (hostname = ary[1]['hostname'] || ary[1]['alias']) and
        (ary[0] = hostname)
    }
    targets = targets.select {|host, | f = h[host]; h[host] = true; !f}
  else
    scheduler_class = Scheduler
  end

  Log.info "START: pid #{$$}"
  if config['para']
    run_para targets, scheduler_class, start_time.to_i, fdh
  else
    run_single targets, scheduler_class, start_time.to_i, fdh
  end
  for writer in @writers
    if writer.respond_to? :merge
      writer.merge
    end
    if writer.respond_to? :purge_logs
      writer.purge_logs
    end
  end
  Log.info "END: pid #{$$}"
  @metrics['targets'] = targets.size
  @metrics['collector_elapsed'] = Time.now - start_time
end

#run_para(targets, scheduler_class, start_time, fdh) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/gri/pcollector.rb', line 7

def run_para targets, scheduler_class, start_time, fdh
  sock_path = config['sock-path'] || '/tmp/.gcollectsock'
  begin
    server_sock = UNIXServer.new sock_path
  rescue SystemCallError
    puts "#{$!}: server_sock error" if $debug
    Log.fatal "#{$!}: server_sock error"
    return
  end

  duration = (config['duration'] || 0).to_i
  if duration.zero?
    basetime = start_time
    offset = 0
  else
    basetime = start_time - start_time % duration
    offset = start_time - basetime
  end
  interval = (config['interval'] || 300).to_i
  ptargets = get_ptargets targets, basetime, duration, offset, interval
  log_dir = config['log-dir'] || (config['root-dir'] + '/log')
  Dir.glob("#{log_dir}/res.*.dump") {|path| File.unlink path} rescue nil
  max_processes = (config['max-processes'] ||
                   config['max-fork-process']).to_i
  max_processes = 30 if max_processes < 1
  nproc = [targets.size * 2 / 3 + 1, max_processes].min
  waittime = [20, duration].min
  begin
    pids = fork_child server_sock, sock_path, nproc, targets, log_dir,
      scheduler_class, fdh
    server_loop targets, ptargets, server_sock, waittime
  rescue TimeoutError, SystemCallError
    Log.error $!.inspect
  ensure
    server_sock.close
    Log.info "server_sock.close"
    File.unlink sock_path
  end
  pids.each {|pid| Process.waitpid pid}
  Dir.glob("#{log_dir}/res.#{$$}.*.dump") {|path|
    begin
      res = Marshal.load_from_file path
      res.each {|k, v| @metrics[k] += v}
      File.unlink path
    rescue SystemCallError
      Log.error "{$!}"
    end
  }
end

#run_single(targets, scheduler_class, start_time, fdh) ⇒ Object



143
144
145
146
147
148
149
150
151
152
# File 'lib/gri/app_collector.rb', line 143

def run_single targets, scheduler_class, start_time, fdh
  loop = Loop.new
  @writers.each {|writer| writer.loop = loop}
  scheduler = scheduler_class.new loop, @metrics
  scheduler.writers = @writers
  scheduler.queue = targets
  scheduler.process_queue
  loop.run
  scheduler.finalize
end

#server_loop(targets, ptargets, server_sock, waittime) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/gri/pcollector.rb', line 57

def server_loop targets, ptargets, server_sock, waittime
  sock = nil
  pkeys = ptargets.keys.sort
  pts = []
  while true
    break if pkeys.empty?
    now = Time.now.to_f
    t = pkeys.first
    if t <= now
      pkeys.shift
      pts += ptargets[t]
      while (n = pts.shift)
        Timeout.timeout(waittime) {sock = server_sock.accept}
        if (res = IO.select(nil, [sock], nil, 20))
          thost = targets[n].first
          sock.puts "#{n} #{thost}"
          sock.close
        else
          sock.close
          raise TimeoutError, 'select timeout'
        end
        if pts.empty? and (t = pkeys.first)
          now = Time.now.to_f
          if t <= now
            pkeys.shift
            small, big = [pts, ptargets[t]].sort_by {|e| e.size}
            pts.replace big.zip(small)
            pts.flatten!
            pts.compact!
            #pts += ptargets[t]
          end
        end
      end
    else
      sleep(t - now)
    end
  end
end