Class: WavefrontCli::BaseWrite

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/base_write.rb

Overview

Send points to a proxy.

Direct Known Subclasses

Report, Write

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#check_status, #conds_to_query, #dispatch, #display, #display_api_error, #display_no_api_response, #do_delete, #do_describe, #do_import, #do_list, #do_search, #do_tag_add, #do_tag_clear, #do_tag_delete, #do_tag_set, #do_tags, #do_undelete, #do_update, #format_var, #handle_error, #handle_response, #hcl_fields, #import_to_create, #initialize, #load_display_class, #load_file, #load_from_stdin, #mk_creds, #mk_opts, #no_api_response, #options_and_exit, #parseable_output, #run, #validate_id, #validate_input, #validate_opts, #validate_tags, #validator_exception, #validator_method

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



9
10
11
# File 'lib/wavefront-cli/base_write.rb', line 9

def fmt
  @fmt
end

Instance Method Details

#call_write(data) ⇒ Object

A wrapper which lets us send normal points or deltas



51
52
53
# File 'lib/wavefront-cli/base_write.rb', line 51

def call_write(data)
  options[:delta] ? wf.write_delta(data) : wf.write(data)
end

#do_fileObject



28
29
30
31
32
# File 'lib/wavefront-cli/base_write.rb', line 28

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_pointObject



12
13
14
15
16
17
18
19
20
# File 'lib/wavefront-cli/base_write.rb', line 12

def do_point
  p = { path:  options[:'<metric>'],
        value: options[:'<value>'].delete('\\').to_f,
        tags:  tags_to_hash(options[:tag]) }

  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]
  send_point(p)
end

#enough_fields?(l) ⇒ Boolean

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

If the format string says we are expecting point tags, we may have more columns than the length of the format string.

Returns:

  • (Boolean)


204
205
206
207
208
209
210
211
212
213
214
# File 'lib/wavefront-cli/base_write.rb', line 204

def enough_fields?(l)
  ncols = l.split.length

  if fmt.include?('T')
    return false unless ncols >= fmt.length
  else
    return false unless ncols == fmt.length
  end

  true
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



108
109
110
111
112
113
114
# File 'lib/wavefront-cli/base_write.rb', line 108

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  return options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]
  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


122
123
124
125
126
# File 'lib/wavefront-cli/base_write.rb', line 122

def extract_source(chunks)
  return chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Object



95
96
97
# File 'lib/wavefront-cli/base_write.rb', line 95

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(/\s(?=(?:[^"]|"[^"]*")*$)/))
end

#extract_ts(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [Float] the timestamp, if it is there, or the current

UTC time if it is not.

raise TypeError if field does not exist



88
89
90
91
92
93
# File 'lib/wavefront-cli/base_write.rb', line 88

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  return parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



76
77
78
79
# File 'lib/wavefront-cli/base_write.rb', line 76

def extract_value(chunks)
  v = chunks[fmt.index('v')]
  v.to_f
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



156
157
158
159
160
# File 'lib/wavefront-cli/base_write.rb', line 156

def line_tags(chunks)
  file_tags = fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag])
  file_tags.merge(opt_tags)
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/wavefront-cli/base_write.rb', line 37

def process_input(file)
  if file == '-'
    read_stdin
  else
    data = load_data(Pathname.new(file)).split("\n").map do |l|
      process_line(l)
    end

    call_write(data)
  end
end

#process_line(l) ⇒ Object

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/wavefront-cli/base_write.rb', line 135

def process_line(l)
  return true if l.empty?
  chunks = l.split(/\s+/, fmt.length)
  raise 'wrong number of fields' unless enough_fields?(l)

  begin
    point = { path:  extract_path(chunks),
              value: extract_value(chunks) }
    point[:ts] = extract_ts(chunks) if fmt.include?('t')
    point[:source] = extract_source(chunks) if fmt.include?('s')
    point[:tags] = line_tags(chunks)
  rescue TypeError
    raise "could not process #{l}"
  end

  point
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



59
60
61
62
63
64
65
66
67
# File 'lib/wavefront-cli/base_write.rb', line 59

def read_stdin
  open_connection
  STDIN.each_line { |l| call_write(process_line(l.strip), false) }
  close_connection
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#send_point(p) ⇒ Object



22
23
24
25
26
# File 'lib/wavefront-cli/base_write.rb', line 22

def send_point(p)
  call_write(p)
rescue Wavefront::Exception::InvalidEndpoint
  abort "could not speak to proxy #{options[:proxy]}:#{options[:port]}."
end

#tags_to_hash(tags) ⇒ Object

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.

return Hash

Parameters:



170
171
172
173
174
175
176
177
178
# File 'lib/wavefront-cli/base_write.rb', line 170

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain a ‘v’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

Parameters:

  • fmt (String)

    format of input file

Returns:

  • (Boolean)


187
188
189
190
191
192
193
194
# File 'lib/wavefront-cli/base_write.rb', line 187

def valid_format?(fmt)
  if fmt.include?('v') && fmt.match(/^[mstv]+T?$/) &&
     fmt == fmt.split('').uniq.join
    return true
  end

  raise 'Invalid format string.'
end

#valid_timestamp?(ts) ⇒ Boolean

Although the SDK does value checking, we’ll add another layer of input checing here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.

Returns:

  • (Boolean)


221
222
223
224
# File 'lib/wavefront-cli/base_write.rb', line 221

def valid_timestamp?(ts)
  (ts.is_a?(Integer) || ts.match(/^\d+$/)) &&
    ts.to_i > 946_684_800 && ts.to_i < (Time.now.to_i + 31_557_600)
end