Class: Wavefront::Write

Inherits:
Object
  • Object
show all
Includes:
Validators
Defined in:
lib/wavefront-sdk/write.rb

Overview

This class helps you send points to Wavefront. It is extended by the Write and Report classes, which respectively handle point ingestion by a proxy and directly to the API.

Direct Known Subclasses

Distribution, Report

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Validators

#uuid?, #wf_account_id?, #wf_alert_id?, #wf_alert_severity?, #wf_apitoken_id?, #wf_aws_external_id?, #wf_cloudintegration_id?, #wf_dashboard_id?, #wf_derivedmetric_id?, #wf_distribution?, #wf_distribution_count?, #wf_distribution_interval?, #wf_distribution_values?, #wf_epoch?, #wf_event_id?, #wf_granularity?, #wf_ingestionpolicy_id?, #wf_integration_id?, #wf_link_id?, #wf_link_template?, #wf_maintenance_window_id?, #wf_message_id?, #wf_metric_name?, #wf_metricspolicy_id?, #wf_monitoredapplication_id?, #wf_monitoredcluster_id?, #wf_ms_ts?, #wf_name?, #wf_notificant_id?, #wf_permission?, #wf_point?, #wf_point_tag?, #wf_point_tags?, #wf_proxy_id?, #wf_role_id?, #wf_sampling_value?, #wf_savedsearch_entity?, #wf_savedsearch_id?, #wf_serviceaccount_id?, #wf_source_id?, #wf_spansamplingpolicy_id?, #wf_string?, #wf_tag?, #wf_trace?, #wf_ts?, #wf_user_id?, #wf_usergroup_id?, #wf_value?, #wf_version?, #wf_webhook_id?

Constructor Details

#initialize(creds = {}, opts = {}) ⇒ Write

Construct an object which gives the user an interface for writing points to Wavefront. The actual writing is handled by

a Wavefront::Writer

subclass.

Parameters:

  • creds (Hash) (defaults to: {})

    credentials: can contain keys: proxy [String] the address of the Wavefront proxy. (‘wavefront’) port [Integer] the port of the Wavefront proxy

  • options (Hash)

    can contain the following keys: tags [Hash] point tags which will be applied to every point noop [Bool] if true, no proxy connection will be made, and

    instead of sending the points, they will be printed in
    Wavefront wire format.
    

    novalidate [Bool] if true, points will not be validated.

    This might make things go marginally quicker if you have
    done point validation higher up in the chain. Invalid
    points are dropped, logged, and reported in the summary.
    

    verbose [Bool] debug [Bool] writer [Symbol, String] the name of the writer class to use.

    Defaults to :proxy
    

    noauto [Bool] if this is false, #write will automatically

    open a connection to Wavefront on each invocation. Set
    this to true to manually manage the connection.
    

    chunk_size [Integer] So as not to create unusable metric

    payloads, large batches of points will be broken into
    chunks. This property controls the number of metrics in a
    chunk. Defaults to 1,000.
    

    chunk_pause [Integer] pause this many seconds between

    writing chunks. Defaults to zero.
    


53
54
55
56
57
58
59
# File 'lib/wavefront-sdk/write.rb', line 53

def initialize(creds = {}, opts = {})
  @opts = setup_options(opts, defaults)
  @creds = creds
  wf_point_tags?(opts[:tags]) if opts[:tags]
  @logger = Wavefront::Logger.new(opts)
  @writer = setup_writer
end

Instance Attribute Details

#credsObject (readonly)

Returns the value of attribute creds.



19
20
21
# File 'lib/wavefront-sdk/write.rb', line 19

def creds
  @creds
end

#loggerObject (readonly)

Returns the value of attribute logger.



19
20
21
# File 'lib/wavefront-sdk/write.rb', line 19

def logger
  @logger
end

#optsObject (readonly)

Returns the value of attribute opts.



19
20
21
# File 'lib/wavefront-sdk/write.rb', line 19

def opts
  @opts
end

#writerObject (readonly)

Returns the value of attribute writer.



19
20
21
# File 'lib/wavefront-sdk/write.rb', line 19

def writer
  @writer
end

Instance Method Details

#chunk_sizeObject



208
209
210
# File 'lib/wavefront-sdk/write.rb', line 208

def chunk_size
  opts[:chunk_size] || writer.chunk_size
end

#closeObject

Wrapper to the writer class’s #close method.



87
88
89
# File 'lib/wavefront-sdk/write.rb', line 87

def close
  writer.close
end

#composite_response(responses) ⇒ Object

Compound the responses of all chunked writes into one. It will be ‘ok’ only if everything passed. Returns 400 as the HTTP code on error, regardless of what actual errors occurred.

Parameters:

Returns:

  • Wavefront::Response



116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/wavefront-sdk/write.rb', line 116

def composite_response(responses)
  result, code = response_results(responses)

  summary = { sent: 0, rejected: 0, unsent: 0 }

  %i[sent rejected unsent].each do |k|
    summary[k] = responses.sum { |r| r.response[k] }
  end

  Wavefront::Response.new(
    { status: { result: result, message: nil, code: code },
      response: summary.to_h }.to_json, nil
  )
end

#data_formatObject



242
243
244
# File 'lib/wavefront-sdk/write.rb', line 242

def data_format
  :wavefront
end

#defaultsObject

Chunk size gets overriden



63
64
65
66
67
68
69
70
71
72
# File 'lib/wavefront-sdk/write.rb', line 63

def defaults
  { tags: nil,
    writer: :proxy,
    noop: false,
    novalidate: false,
    noauto: false,
    verbose: false,
    debug: false,
    chunk_pause: 0 }
end

#hash_to_wf(point) ⇒ Object

Convert a validated point to a string conforming to community.wavefront.com/docs/DOC-1031. No validation is done here.

Parameters:

  • point (Hash)

    a hash describing a point. See #write() for the format.

Raises:



219
220
221
222
223
224
225
# File 'lib/wavefront-sdk/write.rb', line 219

def hash_to_wf(point)
  raise Wavefront::Exception::InvalidMetricName unless point[:path]
  raise Wavefront::Exception::InvalidMetricValue unless point[:value]

  format('%<path>s %<value>s %<ts>s source=%<source>s %<tags>s %<opttags>s',
         point_hash(point)).squeeze(' ').strip
end

#manage_connObject



139
140
141
# File 'lib/wavefront-sdk/write.rb', line 139

def manage_conn
  opts[:noauto] ? false : true
end

#openObject

Wrapper to the writer class’s #open method. Using this you can manually open a connection and re-use it.



81
82
83
# File 'lib/wavefront-sdk/write.rb', line 81

def open
  writer.open
end

#paths_to_deltas(points) ⇒ Array[Hash]

Prefix all paths in a points array (as passed to #write_delta() with a delta symbol

Parameters:

Returns:



162
163
164
# File 'lib/wavefront-sdk/write.rb', line 162

def paths_to_deltas(points)
  [points].flatten.map { |p| p.tap { p[:path] = DELTA + p[:path] } }
end

#point_hash(point) ⇒ Object



227
228
229
230
231
232
233
234
# File 'lib/wavefront-sdk/write.rb', line 227

def point_hash(point)
  point.dup.tap do |p|
    p[:ts] ||= nil
    p[:source] ||= HOSTNAME
    p[:tags] = tags_or_nothing(p.fetch(:tags, nil))
    p[:opttags] = tags_or_nothing(opts.fetch(:tags, nil))
  end
end

#raw(points, openclose = manage_conn) ⇒ Object

Send raw data to a Wavefront proxy, optionally automatically opening and closing the connection. (Or not, if that does not make sense in the context of the writer.)

Parameters:

  • points (Array[String])

    an array of points in native Wavefront wire format, as described in community.wavefront.com/docs/DOC-1031. No validation is performed.

  • openclose (Boolean) (defaults to: manage_conn)

    whether or not to automatically open a socket to the proxy before sending points, and afterwards, close it.



192
193
194
195
196
197
198
199
200
# File 'lib/wavefront-sdk/write.rb', line 192

def raw(points, openclose = manage_conn)
  writer.open if openclose && writer.respond_to?(:open)

  begin
    [points].flatten.each { |p| writer.send_point(p) }
  ensure
    writer.close if openclose && writer.respond_to?(:close)
  end
end

#response_results(responses) ⇒ Object



131
132
133
134
135
136
137
# File 'lib/wavefront-sdk/write.rb', line 131

def response_results(responses)
  if responses.all?(&:ok?)
    ['OK', 200]
  else
    ['ERROR', 400]
  end
end

#send_point(point) ⇒ Object

Wrapper for the writer class’s #send_point method

Parameters:

  • point (String)

    a point description, probably from #hash_to_wf()



170
171
172
173
174
175
176
177
178
# File 'lib/wavefront-sdk/write.rb', line 170

def send_point(point)
  if opts[:noop]
    logger.log "Would send: #{point}"
    return
  end

  logger.log("Sending: #{point}", :debug)
  writer.send_point(point)
end

#setup_options(user, defaults) ⇒ Object



74
75
76
# File 'lib/wavefront-sdk/write.rb', line 74

def setup_options(user, defaults)
  defaults.merge(user)
end

#tags_or_nothing(tags) ⇒ Object



236
237
238
239
240
# File 'lib/wavefront-sdk/write.rb', line 236

def tags_or_nothing(tags)
  return nil unless tags

  tags.to_wf_tag
end

#validationObject

The method used to validate the data we wish to write.



204
205
206
# File 'lib/wavefront-sdk/write.rb', line 204

def validation
  :wf_point?
end

#write(points = [], openclose = manage_conn, prefix = nil) ⇒ Boolean

POST /report A wrapper to the writer class’s #write method. Writers implement this method differently, Check the appropriate class documentation for @return information etc. The signature is always the same.

Returns:

  • (Boolean)

    false should any chunk fail

Raises:

  • any exceptions raised by the writer classes are passed through



100
101
102
103
104
105
106
107
108
# File 'lib/wavefront-sdk/write.rb', line 100

def write(points = [], openclose = manage_conn, prefix = nil)
  resps = [points].flatten.each_slice(chunk_size).map do |chunk|
    resp = writer.write(chunk, openclose, prefix)
    sleep(opts[:chunk_pause])
    resp
  end

  composite_response(resps)
end

#write_delta(points, openclose = manage_conn) ⇒ Object

A wrapper method around #write() which guarantees all points will be sent as deltas. You can still manually prefix any metric with a delta symbol and use #write(), but depending on your use-case, this method may be safer. It’s easy to forget the delta.

Parameters:

  • points (Array[Hash])

    see #write()

  • openclose (Bool) (defaults to: manage_conn)

    see #write()



152
153
154
# File 'lib/wavefront-sdk/write.rb', line 152

def write_delta(points, openclose = manage_conn)
  write(paths_to_deltas(points), openclose)
end