Class: WavefrontCli::Write

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/write.rb

Overview

Send points via any method supported by the SDK

Constant Summary collapse

SPLIT_PATTERN =
/\s(?=(?:[^"]|"[^"]*")*$)/.freeze

Constants included from Constants

Constants::ALL_PAGE_SIZE, Constants::DEFAULT_CONFIG, Constants::DEFAULT_OPTS, Constants::EVENT_STATE_DIR, Constants::HUMAN_TIME_FORMAT, Constants::HUMAN_TIME_FORMAT_MS, Constants::SEARCH_SPLIT

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#cannot_noop!, #check_response_blocks, #check_status, #cli_output_class, #conds_to_query, #descriptive_name, #dispatch, #display, #display_api_error, #display_class, #display_no_api_response, #do_delete, #do_describe, #do_dump, #do_import, #do_list, #do_search, #do_set, #do_undelete, #dump_json, #dump_yaml, #extract_values, #failed_validation_message, #format_var, #handle_error, #handle_response, #hcl_fields, #import_to_create, #initialize, #item_dump_call, #load_display_class, #matching_method, #method_word_list, #mk_opts, #name_of_do_method, #no_api_response, #ok_exit, #one_or_all, #options_and_exit, #parseable_output, #range_hash, #require_sdk_class, #run, #search_key, #smart_delete, #smart_delete_message, #status_error_handler, #unsupported_format_message, #validate_id, #validate_input, #validate_tags, #validator_exception, #validator_method, #warning_message

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



11
12
13
# File 'lib/wavefront-cli/write.rb', line 11

def fmt
  @fmt
end

Instance Method Details

#_sdk_classObject

I chose to prioritise UI consistency over internal elegance here. The ‘write` command doesn’t follow the age-old assumption that each command maps 1:1 to a similarly named SDK class. Write can use ‘write` or `distribution`.



94
95
96
97
98
# File 'lib/wavefront-cli/write.rb', line 94

def _sdk_class
  return 'Wavefront::Distribution' if distribution?

  'Wavefront::Write'
end

#call_write(data, openclose = true) ⇒ Object

A wrapper which lets us send normal points, deltas, or distributions



180
181
182
183
184
185
186
# File 'lib/wavefront-cli/write.rb', line 180

def call_write(data, openclose = true)
  if options[:delta]
    wf.write_delta(data, openclose)
  else
    wf.write(data, openclose)
  end
end

#close_connectionObject



143
144
145
# File 'lib/wavefront-cli/write.rb', line 143

def close_connection
  wf.close
end

#default_portObject



114
115
116
# File 'lib/wavefront-cli/write.rb', line 114

def default_port
  distribution? ? 40_000 : 2878
end

#distribution?Boolean

Returns:

  • (Boolean)


100
101
102
103
104
# File 'lib/wavefront-cli/write.rb', line 100

def distribution?
  return true if options[:distribution]

  options[:infileformat]&.include?('d')
end

#do_distributionObject



36
37
38
# File 'lib/wavefront-cli/write.rb', line 36

def do_distribution
  send_point(make_distribution_point(tags_to_hash(options[:tag])))
end

#do_fileObject

rubocop:enable Metrics/AbcSize



30
31
32
33
34
# File 'lib/wavefront-cli/write.rb', line 30

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_noiseObject



40
41
42
43
44
45
# File 'lib/wavefront-cli/write.rb', line 40

def do_noise
  loop do
    do_point(random_value(options[:min] || -10, options[:max] || 10))
    sleep(sleep_time)
  end
end

#do_point(value = options[:'<value>']) ⇒ Object

rubocop:disable Metrics/AbcSize



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/wavefront-cli/write.rb', line 17

def do_point(value = options[:'<value>'])
  tags = tags_to_hash(options[:tag])

  p = { path: options[:'<metric>'],
        value: sane_value(value) }

  p[:tags] = tags unless tags.empty?
  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]
  send_point(p)
end

#enough_fields?(line) ⇒ True

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

Parameters:

  • line (String)

    input line

Returns:

  • (True)

    if the number of fields is correct

Raises:

  • WavefrontCli::Exception::UnparseableInput if there are not the right number of fields.



409
410
411
412
413
414
415
416
417
418
# File 'lib/wavefront-cli/write.rb', line 409

def enough_fields?(line)
  ncols = line.split(SPLIT_PATTERN).length
  return true if fmt.include?('T') && ncols >= fmt.length
  return true if ncols == fmt.length

  raise(WavefrontCli::Exception::UnparseableInput,
        format('Expected %<expected>s fields, got %<got>s',
               expected: fmt.length,
               got: ncols))
end

#expand_dist(dist) ⇒ Object

We will let users write a distribution as ‘1 1 1’ or ‘3x1’ or even a mix of the two



223
224
225
226
227
228
229
230
231
232
# File 'lib/wavefront-cli/write.rb', line 223

def expand_dist(dist)
  dist.map do |v|
    if v.is_a?(String) && v.include?('x')
      x, val = v.split('x', 2)
      Array.new(x.to_i, val.to_f)
    else
      v.to_f
    end
  end.flatten
end

#extra_optionsObject



85
86
87
# File 'lib/wavefront-cli/write.rb', line 85

def extra_options
  options[:using] ? { writer: options[:using] } : {}
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



264
265
266
267
268
269
270
271
# File 'lib/wavefront-cli/write.rb', line 264

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]

  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


279
280
281
282
283
# File 'lib/wavefront-cli/write.rb', line 279

def extract_source(chunks)
  chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Hash

Returns of k = v tags.

Parameters:

  • chunks (Array)

    an input line broken into tokens. The final token will be a space-separated list of point tags.

Returns:

  • (Hash)

    of k = v tags.



251
252
253
# File 'lib/wavefront-cli/write.rb', line 251

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(SPLIT_PATTERN))
end

#extract_ts(chunks) ⇒ Float

Find and return the source in a chunked line of input.

Parameters:

  • chunks (Array)

    a chunked line of input from #process_line

Returns:

  • (Float)

    the timestamp, if it is there, or the current UTC time if it is not.



240
241
242
243
244
245
# File 'lib/wavefront-cli/write.rb', line 240

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  return parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



209
210
211
212
213
214
215
216
217
218
# File 'lib/wavefront-cli/write.rb', line 209

def extract_value(chunks)
  if fmt.include?('v')
    v = chunks[fmt.index('v')]
    v.to_f
  else
    raw = chunks[fmt.index('d')].split(',')
    xpanded = expand_dist(raw)
    wf.mk_distribution(xpanded)
  end
end

#format_string_does_not_have_v_and_d?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



370
371
372
373
374
375
# File 'lib/wavefront-cli/write.rb', line 370

def format_string_does_not_have_v_and_d?(fmt)
  return true unless fmt.include?('v') && fmt.include?('d')

  raise(WavefrontCli::Exception::UnparseableInput,
        "'v' and 'd' are mutually exclusive")
end

#format_string_has_big_t_only_at_the_end?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



391
392
393
394
395
396
397
# File 'lib/wavefront-cli/write.rb', line 391

def format_string_has_big_t_only_at_the_end?(fmt)
  return true unless fmt.include?('T')
  return true if fmt.end_with?('T')

  raise(WavefrontCli::Exception::UnparseableInput,
        "if used, 'T' must come at end of format string")
end

#format_string_has_unique_chars?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



384
385
386
387
388
389
# File 'lib/wavefront-cli/write.rb', line 384

def format_string_has_unique_chars?(fmt)
  return true if fmt.chars.sort == fmt.chars.uniq.sort

  raise(WavefrontCli::Exception::UnparseableInput,
        'repeated field in format string')
end

#format_string_has_v_or_d?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



363
364
365
366
367
368
# File 'lib/wavefront-cli/write.rb', line 363

def format_string_has_v_or_d?(fmt)
  return true if fmt.include?('v') || fmt.include?('d')

  raise(WavefrontCli::Exception::UnparseableInput,
        "format string must include 'v' or 'd'")
end

#format_string_is_all_valid_chars?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



377
378
379
380
381
382
# File 'lib/wavefront-cli/write.rb', line 377

def format_string_is_all_valid_chars?(fmt)
  return true if fmt =~ /^[dmstTv]+$/

  raise(WavefrontCli::Exception::UnparseableInput,
        'unsupported field in format string')
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



323
324
325
326
327
# File 'lib/wavefront-cli/write.rb', line 323

def line_tags(chunks)
  file_tags = fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag]) || {}
  file_tags.merge(opt_tags)
end

#load_data(file) ⇒ Object



438
439
440
441
442
# File 'lib/wavefront-cli/write.rb', line 438

def load_data(file)
  IO.read(file)
rescue StandardError
  raise WavefrontCli::Exception::FileNotFound
end

#make_distribution_point(tags) ⇒ Object

rubocop:disable Metrics/AbcSize



66
67
68
69
70
71
72
73
74
# File 'lib/wavefront-cli/write.rb', line 66

def make_distribution_point(tags)
  { path: options[:'<metric>'],
    interval: options[:interval] || 'M',
    tags: tags,
    value: mk_dist }.tap do |p|
    p[:source] = options[:host] if options[:host]
    p[:ts] = parse_time(options[:time]) if options[:time]
  end
end

#mk_credsObject



106
107
108
109
110
111
112
# File 'lib/wavefront-cli/write.rb', line 106

def mk_creds
  { proxy: options[:proxy],
    port: options[:port] || default_port,
    socket: options[:socket],
    endpoint: options[:endpoint],
    token: options[:token] }
end

#mk_distObject

Turn our user’s representation of a distribution into one which suits Wavefront. The SDK can do this for us.



80
81
82
83
# File 'lib/wavefront-cli/write.rb', line 80

def mk_dist
  xpanded = expand_dist(options[:'<val>'])
  wf.mk_distribution(xpanded.map(&:to_f))
end

#open_connectionObject



139
140
141
# File 'lib/wavefront-cli/write.rb', line 139

def open_connection
  wf.open
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



156
157
158
159
160
161
162
163
164
# File 'lib/wavefront-cli/write.rb', line 156

def process_input(file)
  if file == '-'
    read_stdin
  else
    call_write(
      process_input_file(load_data(Pathname.new(file)).split("\n"))
    )
  end
end

#process_input_file(data) ⇒ Object

Parameters:



168
169
170
171
172
173
174
175
# File 'lib/wavefront-cli/write.rb', line 168

def process_input_file(data)
  data.each_with_object([]) do |l, a|
    a.<< process_line(l)
  rescue WavefrontCli::Exception::UnparseableInput => e
    puts "Bad input. #{e.message}."
    next
  end
end

#process_line(line) ⇒ Hash

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.

rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize

Parameters:

  • line (String)

    a line of an input file

Returns:

  • (Hash)

Raises:

  • WavefrontCli::Exception::UnparseableInput if the line doesn’t look right



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/wavefront-cli/write.rb', line 299

def process_line(line)
  return true if line.empty?

  chunks = line.split(SPLIT_PATTERN, fmt.length)
  enough_fields?(line) # can raise exception

  point = { path: extract_path(chunks),
            value: extract_value(chunks) }

  tags = line_tags(chunks)

  point.tap do |p|
    p[:tags]     = tags unless tags.empty?
    p[:ts]       = extract_ts(chunks)        if fmt.include?('t')
    p[:source]   = extract_source(chunks)    if fmt.include?('s')
    p[:interval] = options[:interval] || 'm' if fmt.include?('d')
  end
end

#random_value(min, max) ⇒ Object



47
48
49
50
51
# File 'lib/wavefront-cli/write.rb', line 47

def random_value(min, max)
  return min if min == max

  rand(max.to_f - min.to_f) + min.to_f
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



192
193
194
195
196
197
198
199
200
# File 'lib/wavefront-cli/write.rb', line 192

def read_stdin
  open_connection
  $stdin.each_line { |l| call_write(process_line(l.strip), false) }
  close_connection
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#sane_value(value) ⇒ Object



53
54
55
56
57
58
59
# File 'lib/wavefront-cli/write.rb', line 53

def sane_value(value)
  return value if value.is_a?(Numeric)

  raise WavefrontCli::Exception::InvalidValue unless value.is_a?(String)

  value.delete('\\').to_f
end

#send_point(point) ⇒ Object



147
148
149
150
151
# File 'lib/wavefront-cli/write.rb', line 147

def send_point(point)
  call_write(point)
rescue Wavefront::Exception::InvalidEndpoint
  abort format("Could not connect to proxy '%<proxy>s:%<port>s'.", wf.creds)
end

#setup_fmt(fmt) ⇒ Object



434
435
436
# File 'lib/wavefront-cli/write.rb', line 434

def setup_fmt(fmt)
  @fmt = fmt.split('')
end

#sleep_timeObject



61
62
63
# File 'lib/wavefront-cli/write.rb', line 61

def sleep_time
  options[:interval] ? options[:interval].to_f : 1
end

#tags_to_hash(tags) ⇒ Hash

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.

Parameters:

Returns:

  • (Hash)

    of k: v tags



337
338
339
340
341
342
343
344
345
# File 'lib/wavefront-cli/write.rb', line 337

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k.to_sym] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain values. They can be single values or distributions. So we must have ‘v’ xor ‘d’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, ‘d’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

Parameters:

  • fmt (String)

    format of input file

Returns:

  • (Boolean)


355
356
357
358
359
360
361
# File 'lib/wavefront-cli/write.rb', line 355

def valid_format?(fmt)
  format_string_has_v_or_d?(fmt)
  format_string_does_not_have_v_and_d?(fmt)
  format_string_is_all_valid_chars?(fmt)
  format_string_has_unique_chars?(fmt)
  format_string_has_big_t_only_at_the_end?(fmt)
end

#valid_timestamp?(timestamp) ⇒ Bool

Although the SDK does value checking, we’ll add another layer of input checking here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.

Parameters:

  • timestamp (String, Integer)

    epoch timestamp

Returns:

  • (Bool)


427
428
429
430
431
432
# File 'lib/wavefront-cli/write.rb', line 427

def valid_timestamp?(timestamp)
  (timestamp.is_a?(Integer) ||
    timestamp.is_a?(String) && timestamp.match(/^\d+$/)) &&
    timestamp.to_i > 946_684_800 &&
    timestamp.to_i < (Time.now.to_i + 31_557_600)
end

#validate_optsObject



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/wavefront-cli/write.rb', line 118

def validate_opts
  validate_opts_file if options[:file]

  if options[:using] == 'unix'
    return true if options[:socket]

    raise(WavefrontCli::Exception::CredentialError, 'No socket path.')
  end

  return true if options[:proxy]

  raise(WavefrontCli::Exception::CredentialError, 'No proxy address.')
end

#validate_opts_fileObject



132
133
134
135
136
137
# File 'lib/wavefront-cli/write.rb', line 132

def validate_opts_file
  return true if options[:metric] || options[:infileformat]&.include?('m')

  raise(WavefrontCli::Exception::InsufficientData,
        "Supply a metric path in the file or with '-m'.")
end