Class: Wavefront::Write

Inherits:
Base
  • Object
show all
Defined in:
lib/wavefront-sdk/write.rb

Overview

This class helps you send points to a Wavefront proxy in native format. Usually this is done on port 2878.

Instance Attribute Summary collapse

Attributes inherited from Base

#api_base, #conn, #debug, #logger, #net, #noop, #opts, #update_keys, #verbose

Instance Method Summary collapse

Methods inherited from Base

#api_delete, #api_get, #api_post, #api_put, #hash_for_update, #initialize, #log, #mk_conn, #respond, #time_to_ms

Methods included from Mixins

#parse_time

Methods included from Validators

#wf_alert_id?, #wf_alert_severity?, #wf_cloudintegration_id?, #wf_dashboard_id?, #wf_epoch?, #wf_event_id?, #wf_granularity?, #wf_link_id?, #wf_link_template?, #wf_maintenance_window_id?, #wf_message_id?, #wf_metric_name?, #wf_ms_ts?, #wf_name?, #wf_point?, #wf_point_tags?, #wf_proxy_id?, #wf_savedsearch_entity?, #wf_savedsearch_id?, #wf_source_id?, #wf_string?, #wf_tag?, #wf_ts?, #wf_user_id?, #wf_value?, #wf_version?, #wf_webhook_id?

Constructor Details

This class inherits a constructor from Wavefront::Base

Instance Attribute Details

#sockObject (readonly)

Returns the value of attribute sock.



12
13
14
# File 'lib/wavefront-sdk/write.rb', line 12

def sock
  @sock
end

#summaryObject (readonly)

Returns the value of attribute summary.



12
13
14
# File 'lib/wavefront-sdk/write.rb', line 12

def summary
  @summary
end

Instance Method Details

#closeObject

Close the socket described by the @sock instance variable.



198
199
200
201
202
# File 'lib/wavefront-sdk/write.rb', line 198

def close
  return if opts[:noop]
  log('Closing connection to proxy.', :info)
  sock.close
end

#hash_to_wf(p) ⇒ Object

Convert a validated point has to a string conforming to community.wavefront.com/docs/DOC-1031. No validation is done here.

Parameters:

  • p (Hash)

    a hash describing a point. See #write() for the format.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/wavefront-sdk/write.rb', line 136

def hash_to_wf(p)
  unless p.key?(:path) && p.key?(:value)
    raise Wavefront::Exception::InvalidPoint
  end

  p[:source] = HOSTNAME unless p.key?(:source)

  m = [p[:path], p[:value]]
  m.<< p[:ts] if p[:ts]
  m.<< 'source=' + p[:source]
  m.<< p[:tags].to_wf_tag if p[:tags]
  m.<< opts[:tags].to_wf_tag if opts[:tags]
  m.join(' ')
end

#openObject

Open a socket to a Wavefront proxy, putting the descriptor in instance variable @sock.



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/wavefront-sdk/write.rb', line 180

def open
  if opts[:noop]
    log('No-op requested. Not opening connection to proxy.')
    return true
  end

  log("Connecting to #{net[:proxy]}:#{net[:port]}.", :info)

  begin
    @sock = TCPSocket.new(net[:proxy], net[:port])
  rescue => e
    log(e, :error)
    raise Wavefront::Exception::InvalidEndpoint
  end
end

#post_initialize(_creds = {}, options = {}) ⇒ Object

Construct an object which allows us to write points to a Wavefront proxy.

Parameters:

  • _creds (Hash) (defaults to: {})

    dummy parameter for correct method signature.

  • options (Hash) (defaults to: {})

    can contain the following keys: proxy [String] the address of the Wavefront proxy. (‘wavefront’) port [Integer] the port of the Wavefront proxy (2878) tags [Hash] point tags which will be applied to every point noop [Bool] if true, no proxy connection will be made, and

    instead of sending the points, they will be printed in
    Wavefront wire format.
    

    novalidate [Bool] if true, points will not be validated.

    This might make things go marginally quicker if you have
    done point validation higher up in the chain.
    

    verbose [Bool] debug [Bool]



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/wavefront-sdk/write.rb', line 32

def post_initialize(_creds = {}, options = {})
  defaults = { tags:       nil,
               noop:       false,
               novalidate: false,
               verbose:    false,
               debug:      false }

  @summary = { sent: 0, rejected: 0, unsent: 0 }
  @opts = setup_options(options, defaults)

  wf_point_tags?(opts[:tags]) if opts[:tags]
end

#raw(points, openclose = true) ⇒ Object

Send raw data to a Wavefront proxy, automatically opening and closing a socket.

Parameters:

  • points (Array[String])

    an array of points in native Wavefront wire format, as described in community.wavefront.com/docs/DOC-1031. No validation is performed.

  • openclose (Boolean) (defaults to: true)

    whether or not to automatically open a socket to the proxy before sending points, and afterwards, close it.



60
61
62
63
64
65
66
67
68
# File 'lib/wavefront-sdk/write.rb', line 60

def raw(points, openclose = true)
  open if openclose

  begin
    [points].flatten.each{ |p| send_point(p) }
  ensure
    close if openclose
  end
end

#send_point(point) ⇒ Object

Send a point which is already in Wavefront wire format.

Parameters:

  • point (String)

    a point description, probably from #hash_to_wf()



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/wavefront-sdk/write.rb', line 156

def send_point(point)
  if opts[:noop]
    log "Would send: #{point}"
    return
  end

  log("Sending: #{point}", :info)

  begin
    sock.puts(point)
  rescue => e
    summary[:unsent] += 1
    log('WARNING: failed to send point.')
    log(e.to_s, :debug)
    return false
  end

  summary[:sent] += 1
  true
end

#setup_options(user, defaults) ⇒ Object



45
46
47
# File 'lib/wavefront-sdk/write.rb', line 45

def setup_options(user, defaults)
  defaults.merge(user)
end

#valid_point?(p) ⇒ Boolean

Returns:

  • (Boolean)


111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/wavefront-sdk/write.rb', line 111

def valid_point?(p)
  return true if opts[:novalidate]

  begin
    wf_point?(p)
    return true
  rescue Wavefront::Exception::InvalidMetricName,
         Wavefront::Exception::InvalidMetricValue,
         Wavefront::Exception::InvalidTimestamp,
         Wavefront::Exception::InvalidSourceId,
         Wavefront::Exception::InvalidTag => e
    log('Invalid point, skipping.', :info)
    log("Invalid point: #{p}. (#{e})", :debug)
    summary[:rejected] += 1
    return false
  end
end

#write(points = [], openclose = true) ⇒ Object

Send multiple points to a Wavefront proxy.

Parameters:

  • points (Array[Hash]) (defaults to: [])

    an array of points. Each point is defined as a hash with the following keys:

    path [String] metric path. (mandatory)
    value [Numeric] value of metric. Numeric. Mandatory.
    ts [Time, Integer] timestamp for point. Defaults to
      current UTC time.
    source [String] originating source of metric. Defaults to
      the local hostname.
    tags [Hash] key: value point tags which will be applied in
      addition to any tags defined in the #initialize()
      method.
    
  • openclose (Bool) (defaults to: true)

    if this is false, you must have already opened a socket to the proxy. If it is true, a connection will be opened for you, used, and closed.

Returns:

  • true if no points are rejected, otherwise false

Raises:

  • any unhandled point validation error is passed through



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/wavefront-sdk/write.rb', line 89

def write(points = [], openclose = true)
  open if openclose

  begin
    [points].flatten.each do |p|
      p[:ts] = p[:ts].to_i if p[:ts].is_a?(Time)
      valid_point?(p)
      send_point(hash_to_wf(p))
    end
  ensure
    close if openclose
  end

  s_str = summary[:unsent] == 0 && summary[:rejected] == 0 ? 'OK' :
                                                             'ERROR'

  resp = { status:   { result: s_str, message: nil, code: nil },
           response: summary }.to_json

  Wavefront::Response.new(resp, nil)
end