Class: WavefrontCli::BaseWrite
- Includes:
- Wavefront::Mixins
- Defined in:
- lib/wavefront-cli/base_write.rb
Overview
Send points to a proxy.
Instance Attribute Summary collapse
-
#fmt ⇒ Object
readonly
Returns the value of attribute fmt.
Attributes inherited from Base
#klass, #klass_word, #options, #wf
Instance Method Summary collapse
-
#call_write(data) ⇒ Object
A wrapper which lets us send normal points or deltas.
- #do_file ⇒ Object
- #do_point ⇒ Object
-
#enough_fields?(l) ⇒ Boolean
Make sure we have the right number of columns, according to the format string.
-
#extract_path(chunks) ⇒ Object
Find and return the metric path in a chunked line of input.
-
#extract_source(chunks) ⇒ Object
Find and return the source in a chunked line of input.
- #extract_tags(chunks) ⇒ Object
-
#extract_ts(chunks) ⇒ Object
Find and return the source in a chunked line of input.
-
#extract_value(chunks) ⇒ Object
Find and return the value in a chunked line of input.
-
#line_tags(chunks) ⇒ Object
We can get tags from the file, from the -T option, or both.
-
#process_input(file) ⇒ Object
Read the input, from a file or from STDIN, and turn each line into Wavefront points.
-
#process_line(l) ⇒ Object
Process a line of input, as described by the format string held in @fmt.
-
#read_stdin ⇒ Object
Read from standard in and stream points through an open socket.
- #send_point(p) ⇒ Object
-
#tags_to_hash(tags) ⇒ Object
Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags.
-
#valid_format?(fmt) ⇒ Boolean
The format string must contain a ‘v’.
-
#valid_timestamp?(ts) ⇒ Boolean
Although the SDK does value checking, we’ll add another layer of input checing here.
Methods inherited from Base
#check_status, #conds_to_query, #dispatch, #display, #display_api_error, #display_no_api_response, #do_delete, #do_describe, #do_import, #do_list, #do_search, #do_tag_add, #do_tag_clear, #do_tag_delete, #do_tag_set, #do_tags, #do_undelete, #do_update, #format_var, #handle_error, #handle_response, #hcl_fields, #import_to_create, #initialize, #load_display_class, #load_file, #load_from_stdin, #mk_creds, #mk_opts, #no_api_response, #options_and_exit, #parseable_output, #run, #validate_id, #validate_input, #validate_opts, #validate_tags, #validator_exception, #validator_method
Constructor Details
This class inherits a constructor from WavefrontCli::Base
Instance Attribute Details
#fmt ⇒ Object (readonly)
Returns the value of attribute fmt.
9 10 11 |
# File 'lib/wavefront-cli/base_write.rb', line 9 def fmt @fmt end |
Instance Method Details
#call_write(data) ⇒ Object
A wrapper which lets us send normal points or deltas
51 52 53 |
# File 'lib/wavefront-cli/base_write.rb', line 51 def call_write(data) [:delta] ? wf.write_delta(data) : wf.write(data) end |
#do_file ⇒ Object
28 29 30 31 32 |
# File 'lib/wavefront-cli/base_write.rb', line 28 def do_file valid_format?([:infileformat]) setup_fmt([:infileformat] || 'tmv') process_input([:'<file>']) end |
#do_point ⇒ Object
12 13 14 15 16 17 18 19 20 |
# File 'lib/wavefront-cli/base_write.rb', line 12 def do_point p = { path: [:'<metric>'], value: [:'<value>'].delete('\\').to_f, tags: ([:tag]) } p[:source] = [:host] if [:host] p[:ts] = parse_time([:time]) if [:time] send_point(p) end |
#enough_fields?(l) ⇒ Boolean
Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.
If the format string says we are expecting point tags, we may have more columns than the length of the format string.
204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/wavefront-cli/base_write.rb', line 204 def enough_fields?(l) ncols = l.split.length if fmt.include?('T') return false unless ncols >= fmt.length else return false unless ncols == fmt.length end true end |
#extract_path(chunks) ⇒ Object
Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.
param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist
108 109 110 111 112 113 114 |
# File 'lib/wavefront-cli/base_write.rb', line 108 def extract_path(chunks) m = chunks[fmt.index('m')] return [:metric] ? [[:metric], m].join('.') : m rescue TypeError return [:metric] if [:metric] raise end |
#extract_source(chunks) ⇒ Object
Find and return the source in a chunked line of input.
param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the
value passed through by -H, or the local hostname.
122 123 124 125 126 |
# File 'lib/wavefront-cli/base_write.rb', line 122 def extract_source(chunks) return chunks[fmt.index('s')] rescue TypeError [:source] || Socket.gethostname end |
#extract_tags(chunks) ⇒ Object
95 96 97 |
# File 'lib/wavefront-cli/base_write.rb', line 95 def (chunks) (chunks.last.split(/\s(?=(?:[^"]|"[^"]*")*$)/)) end |
#extract_ts(chunks) ⇒ Object
Find and return the source in a chunked line of input.
param chunks [Array] a chunked line of input from #process_line return [Float] the timestamp, if it is there, or the current
UTC time if it is not.
raise TypeError if field does not exist
88 89 90 91 92 93 |
# File 'lib/wavefront-cli/base_write.rb', line 88 def extract_ts(chunks) ts = chunks[fmt.index('t')] return parse_time(ts) if (ts) rescue TypeError Time.now.utc.to_i end |
#extract_value(chunks) ⇒ Object
Find and return the value in a chunked line of input
param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value
76 77 78 79 |
# File 'lib/wavefront-cli/base_write.rb', line 76 def extract_value(chunks) v = chunks[fmt.index('v')] v.to_f end |
#line_tags(chunks) ⇒ Object
We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.
156 157 158 159 160 |
# File 'lib/wavefront-cli/base_write.rb', line 156 def (chunks) = fmt.last == 'T' ? (chunks) : {} = ([:tag]) .merge() end |
#process_input(file) ⇒ Object
Read the input, from a file or from STDIN, and turn each line into Wavefront points.
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/wavefront-cli/base_write.rb', line 37 def process_input(file) if file == '-' read_stdin else data = load_data(Pathname.new(file)).split("\n").map do |l| process_line(l) end call_write(data) end end |
#process_line(l) ⇒ Object
Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.
We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/wavefront-cli/base_write.rb', line 135 def process_line(l) return true if l.empty? chunks = l.split(/\s+/, fmt.length) raise 'wrong number of fields' unless enough_fields?(l) begin point = { path: extract_path(chunks), value: extract_value(chunks) } point[:ts] = extract_ts(chunks) if fmt.include?('t') point[:source] = extract_source(chunks) if fmt.include?('s') point[:tags] = (chunks) rescue TypeError raise "could not process #{l}" end point end |
#read_stdin ⇒ Object
Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.
59 60 61 62 63 64 65 66 67 |
# File 'lib/wavefront-cli/base_write.rb', line 59 def read_stdin open_connection STDIN.each_line { |l| call_write(process_line(l.strip), false) } close_connection rescue SystemExit, Interrupt puts 'ctrl-c. Exiting.' wf.close exit 0 end |
#send_point(p) ⇒ Object
22 23 24 25 26 |
# File 'lib/wavefront-cli/base_write.rb', line 22 def send_point(p) call_write(p) rescue Wavefront::Exception::InvalidEndpoint abort "could not speak to proxy #{[:proxy]}:#{[:port]}." end |
#tags_to_hash(tags) ⇒ Object
Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.
return Hash
170 171 172 173 174 175 176 177 178 |
# File 'lib/wavefront-cli/base_write.rb', line 170 def () return nil unless [].flatten.each_with_object({}) do |t, ret| k, v = t.split('=', 2) k.gsub!(/^["']|["']$/, '') ret[k] = v.to_s.gsub(/^["']|["']$/, '') if v end end |
#valid_format?(fmt) ⇒ Boolean
The format string must contain a ‘v’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.
187 188 189 190 191 192 193 194 |
# File 'lib/wavefront-cli/base_write.rb', line 187 def valid_format?(fmt) if fmt.include?('v') && fmt.match(/^[mstv]+T?$/) && fmt == fmt.split('').uniq.join return true end raise 'Invalid format string.' end |
#valid_timestamp?(ts) ⇒ Boolean
Although the SDK does value checking, we’ll add another layer of input checing here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.
221 222 223 224 |
# File 'lib/wavefront-cli/base_write.rb', line 221 def (ts) (ts.is_a?(Integer) || ts.match(/^\d+$/)) && ts.to_i > 946_684_800 && ts.to_i < (Time.now.to_i + 31_557_600) end |