Class: WavefrontCli::Write

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/write.rb

Overview

Send points via a proxy. This inherits from the same base class as Report, but has to do a couple of things differently, as it speaks to a proxy rather than to the API.

Constant Summary collapse

SPLIT_PATTERN =
/\s(?=(?:[^"]|"[^"]*")*$)/

Constants included from Constants

Constants::ALL_PAGE_SIZE, Constants::DEFAULT_CONFIG, Constants::DEFAULT_OPTS, Constants::HUMAN_TIME_FORMAT, Constants::HUMAN_TIME_FORMAT_MS

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#cannot_noop!, #check_status, #conds_to_query, #dispatch, #display, #display_api_error, #display_no_api_response, #do_delete, #do_describe, #do_import, #do_list, #do_search, #do_undelete, #do_update, #extract_values, #failed_validation_message, #format_var, #handle_error, #handle_response, #hcl_fields, #import_to_create, #initialize, #load_display_class, #load_file, #load_from_stdin, #mk_opts, #no_api_response, #ok_exit, #one_or_all, #options_and_exit, #parseable_output, #range_hash, #run, #search_key, #smart_delete, #smart_delete_message, #validate_id, #validate_input, #validate_tags, #validator_exception, #validator_method

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



11
12
13
# File 'lib/wavefront-cli/write.rb', line 11

def fmt
  @fmt
end

Instance Method Details

#_sdk_classObject

I chose to prioritise UI consistency over internal elegance here. The ‘write` command doesn’t follow the age-old assumption that each command maps 1:1 to a similarly named SDK class. Write can use ‘write` or `distribution`.



66
67
68
69
# File 'lib/wavefront-cli/write.rb', line 66

def _sdk_class
  return 'Wavefront::Distribution' if distribution?
  'Wavefront::Write'
end

#call_write(data, openclose = true) ⇒ Object

A wrapper which lets us send normal points, deltas, or distributions



151
152
153
154
155
156
157
# File 'lib/wavefront-cli/write.rb', line 151

def call_write(data, openclose = true)
  if options[:delta]
    wf.write_delta(data, openclose)
  else
    wf.write(data, openclose)
  end
end

#close_connectionObject



111
112
113
# File 'lib/wavefront-cli/write.rb', line 111

def close_connection
  wf.close
end

#default_portObject



84
85
86
# File 'lib/wavefront-cli/write.rb', line 84

def default_port
  distribution? ? 40000 : 2878
end

#distribution?Boolean



71
72
73
74
# File 'lib/wavefront-cli/write.rb', line 71

def distribution?
  return true if options[:distribution]
  options[:infileformat]&.include?('d')
end

#do_distributionObject

rubocop:disable Metrics/AbcSize



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/wavefront-cli/write.rb', line 36

def do_distribution
  p = { path:     options[:'<metric>'],
        interval: options[:interval] || 'M',
        value:    mk_dist }

  tags = tags_to_hash(options[:tag])
  p[:tags] = tags unless tags.empty?
  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]
  send_point(p)
end

#do_fileObject

rubocop:enable Metrics/AbcSize



29
30
31
32
33
# File 'lib/wavefront-cli/write.rb', line 29

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_pointObject

rubocop:disable Metrics/AbcSize



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/wavefront-cli/write.rb', line 16

def do_point
  p = { path:  options[:'<metric>'],
        value: options[:'<value>'].delete('\\').to_f }

  tags = tags_to_hash(options[:tag])

  p[:tags] = tags unless tags.empty?
  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]
  send_point(p)
end

#enough_fields?(line) ⇒ True

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

Raises:

  • WavefrontCli::Exception::UnparseableInput if there are not the right number of fields.



358
359
360
361
362
363
364
# File 'lib/wavefront-cli/write.rb', line 358

def enough_fields?(line)
  ncols = line.split(SPLIT_PATTERN).length
  return true if fmt.include?('T') && ncols >= fmt.length
  return true if ncols == fmt.length
  raise(WavefrontCli::Exception::UnparseableInput,
        format('Expected %s fields, got %s', fmt.length, ncols))
end

#expand_dist(dist) ⇒ Object

We will let users write a distribution as ‘1 1 1’ or ‘3x1’ or even a mix of the two



194
195
196
197
198
199
200
201
202
203
# File 'lib/wavefront-cli/write.rb', line 194

def expand_dist(dist)
  dist.map do |v|
    if v.is_a?(String) && v.include?('x')
      x, val = v.split('x', 2)
      Array.new(x.to_i, val.to_f)
    else
      v.to_f
    end
  end.flatten
end

#extra_optionsObject



57
58
59
# File 'lib/wavefront-cli/write.rb', line 57

def extra_options
  options[:using] ? { writer: options[:using] } : {}
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



235
236
237
238
239
240
241
# File 'lib/wavefront-cli/write.rb', line 235

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]
  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


249
250
251
252
253
# File 'lib/wavefront-cli/write.rb', line 249

def extract_source(chunks)
  chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Hash



222
223
224
# File 'lib/wavefront-cli/write.rb', line 222

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(SPLIT_PATTERN))
end

#extract_ts(chunks) ⇒ Float

Find and return the source in a chunked line of input.



211
212
213
214
215
216
# File 'lib/wavefront-cli/write.rb', line 211

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  return parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



180
181
182
183
184
185
186
187
188
189
# File 'lib/wavefront-cli/write.rb', line 180

def extract_value(chunks)
  if fmt.include?('v')
    v = chunks[fmt.index('v')]
    v.to_f
  else
    raw = chunks[fmt.index('d')].split(',')
    xpanded = expand_dist(raw)
    wf.mk_distribution(xpanded)
  end
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



292
293
294
295
296
# File 'lib/wavefront-cli/write.rb', line 292

def line_tags(chunks)
  file_tags = fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag]) || {}
  file_tags.merge(opt_tags)
end

#load_data(file) ⇒ Object



384
385
386
387
388
# File 'lib/wavefront-cli/write.rb', line 384

def load_data(file)
  IO.read(file)
rescue StandardError
  raise WavefrontCli::Exception::FileNotFound
end

#mk_credsObject



76
77
78
79
80
81
82
# File 'lib/wavefront-cli/write.rb', line 76

def mk_creds
  { proxy:    options[:proxy],
    port:     options[:port] || default_port,
    socket:   options[:socket],
    endpoint: options[:endpoint],
    token:    options[:token] }
end

#mk_distObject

Turn our user’s representation of a distribution into one which suits Wavefront. The SDK can do this for us.



52
53
54
55
# File 'lib/wavefront-cli/write.rb', line 52

def mk_dist
  xpanded = expand_dist(options[:'<val>'])
  wf.mk_distribution(xpanded.map(&:to_f))
end

#open_connectionObject



107
108
109
# File 'lib/wavefront-cli/write.rb', line 107

def open_connection
  wf.open
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



125
126
127
128
129
130
131
132
133
# File 'lib/wavefront-cli/write.rb', line 125

def process_input(file)
  if file == '-'
    read_stdin
  else
    call_write(
      process_input_file(load_data(Pathname.new(file)).split("\n"))
    )
  end
end

#process_input_file(data) ⇒ Object



137
138
139
140
141
142
143
144
145
146
# File 'lib/wavefront-cli/write.rb', line 137

def process_input_file(data)
  data.each_with_object([]) do |l, a|
    begin
      a.<< process_line(l)
    rescue WavefrontCli::Exception::UnparseableInput => e
      puts "Bad input. #{e.message}."
      next
    end
  end
end

#process_line(line) ⇒ Hash

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity

Raises:

  • WavefrontCli::Exception::UnparseableInput if the line doesn’t look right



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/wavefront-cli/write.rb', line 269

def process_line(line)
  return true if line.empty?
  chunks = line.split(SPLIT_PATTERN, fmt.length)
  enough_fields?(line) # can raise exception

  point = { path:  extract_path(chunks),
            value: extract_value(chunks) }

  tags = line_tags(chunks)

  point.tap do |p|
    p[:tags]     = tags unless tags.empty?
    p[:ts]       = extract_ts(chunks)        if fmt.include?('t')
    p[:source]   = extract_source(chunks)    if fmt.include?('s')
    p[:interval] = options[:interval] || 'm' if fmt.include?('d')
  end
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



163
164
165
166
167
168
169
170
171
# File 'lib/wavefront-cli/write.rb', line 163

def read_stdin
  open_connection
  STDIN.each_line { |l| call_write(process_line(l.strip), false) }
  close_connection
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#send_point(point) ⇒ Object



115
116
117
118
119
120
# File 'lib/wavefront-cli/write.rb', line 115

def send_point(point)
  call_write(point)
rescue Wavefront::Exception::InvalidEndpoint
  abort format("Could not connect to proxy '%s:%s'.",
               options[:proxy], options[:port])
end

#setup_fmt(fmt) ⇒ Object



380
381
382
# File 'lib/wavefront-cli/write.rb', line 380

def setup_fmt(fmt)
  @fmt = fmt.split('')
end

#tags_to_hash(tags) ⇒ Hash

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.



306
307
308
309
310
311
312
313
314
# File 'lib/wavefront-cli/write.rb', line 306

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k.to_sym] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain values. They can be single values or distributions. So we must have ‘v’ xor ‘d’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, ‘d’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

rubocop:disable Metrics/PerceivedComplexity rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/AbcSize



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/wavefront-cli/write.rb', line 327

def valid_format?(fmt)
  err = if fmt.include?('v') && fmt.include?('d')
          "'v' and 'd' are mutually exclusive"
        elsif !fmt.include?('v') && !fmt.include?('d')
          "format string must include 'v' or 'd'"
        elsif !fmt.match(/^[dmstTv]+$/)
          'unsupported field in format string'
        elsif fmt != fmt.squeeze
          'repeated field in format string'
        elsif fmt.include?('T') && !fmt.end_with?('T')
          "if used, 'T' must come at end of format string"
        end

  return true if err.nil?

  raise(WavefrontCli::Exception::UnparseableInput, err)
end

#valid_timestamp?(timestamp) ⇒ Bool

Although the SDK does value checking, we’ll add another layer of input checking here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.



373
374
375
376
377
378
# File 'lib/wavefront-cli/write.rb', line 373

def valid_timestamp?(timestamp)
  (timestamp.is_a?(Integer) ||
    timestamp.is_a?(String) && timestamp.match(/^\d+$/)) &&
    timestamp.to_i > 946_684_800 &&
    timestamp.to_i < (Time.now.to_i + 31_557_600)
end

#validate_optsObject



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/wavefront-cli/write.rb', line 88

def validate_opts
  validate_opts_file if options[:file]

  if options[:using] == 'unix'
    return true if options[:socket]
    raise(WavefrontCli::Exception::CredentialError, 'No socket path.')
  end

  return true if options[:proxy]
  raise(WavefrontCli::Exception::CredentialError, 'No proxy address.')
end

#validate_opts_fileObject



100
101
102
103
104
105
# File 'lib/wavefront-cli/write.rb', line 100

def validate_opts_file
  return true if options[:metric] || options[:infileformat]&.include?('m')

  raise(WavefrontCli::Exception::InsufficientData,
        "Supply a metric path in the file or with '-m'.")
end