Class: Sensu::Transport::SNSSQS

Inherits:
Base
  • Object
show all
Defined in:
lib/sensu/transport/snssqs.rb

Constant Summary collapse

STRING_STR =
'String'.freeze
NUMBER_STR =
'Number'.freeze
KEEPALIVES_STR =
'keepalives'.freeze
PIPE_STR =
'pipe'.freeze
TYPE_STR =
'type'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSNSSQS

Returns a new instance of SNSSQS.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/sensu/transport/snssqs.rb', line 19

def initialize
  @connected = false
  @subscribing = false
  @history = {}
  @metrics_buffer = ''
  @metrics_last_flush = 0

  # as of sensu 0.23.0 we need to call succeed when we have
  # successfully connected to SQS.
  #
  # we already have our own logic to maintain the connection to
  # SQS, so we can always say we're connected.
  #
  # See:
  # https://github.com/sensu/sensu/blob/cdc25b29169ef2dcd2e056416eab0e83dbe000bb/CHANGELOG.md#0230---2016-04-04
  succeed
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



11
12
13
# File 'lib/sensu/transport/snssqs.rb', line 11

def logger
  @logger
end

Instance Method Details

#acknowledge(info, &callback) ⇒ Object

acknowledge will delete the given message from the SQS queue.



138
139
140
141
142
143
144
145
146
147
# File 'lib/sensu/transport/snssqs.rb', line 138

def acknowledge(info, &callback)
  EM.defer do
    @sqs.delete_message(
      queue_url: @settings[:consuming_sqs_queue_url],
      receipt_handle: info.receipt_handle
    )
    statsd_incr("sqs.#{@settings[:consuming_sqs_queue_url]}.message.deleted")
    yield(info) if callback
  end
end

#connect(settings) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/sensu/transport/snssqs.rb', line 41

def connect(settings)
  @settings = settings
  @connected = true
  @results_callback = proc {}
  @keepalives_callback = proc {}
  # Sensu Windows install does not include a valid cert bundle for AWS
  Aws.use_bundled_cert! if Gem.win_platform?
  aws_client_settings = { region: @settings[:region] }
  unless @settings[:access_key_id].nil?
    aws_client_settings[:access_key_id] = @settings[:access_key_id]
    aws_client_settings[:secret_access_key] = @settings[:secret_access_key]
  end
  @sqs = Aws::SQS::Client.new aws_client_settings
  @sns = Aws::SNS::Client.new aws_client_settings

  # connect to statsd, if necessary
  @statsd = nil
  if !@settings[:statsd_addr].nil? && @settings[:statsd_addr] != ''
    pieces = @settings[:statsd_addr].split(':')
    @statsd = Statsd.new(pieces[0], pieces[1].to_i).tap do |sd|
      sd.namespace = @settings[:statsd_namespace]
    end
    @statsd_sample_rate = @settings[:statsd_sample_rate].to_f
  end

  # setup custom buffer
  @settings[:buffer_messages] = @settings.fetch(:buffer_messages, true)
  @settings[:check_min_ok] = @settings.fetch(:check_min_ok, 10)
  @settings[:check_max_delay] = @settings.fetch(:check_max_delay, 1800)
  @settings[:metrics_max_size] = @settings.fetch(:metrics_max_size, 102_400)
  @settings[:metrics_max_delay] = @settings.fetch(:metrics_max_delay, 900)
end

#connected?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/sensu/transport/snssqs.rb', line 37

def connected?
  @connected
end

#handleBuffer(raw_message) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/sensu/transport/snssqs.rb', line 149

def handleBuffer(raw_message)
  json_message = ::JSON.parse raw_message
  drop = false

  if @settings[:buffer_messages] && json_message.key?('check') && json_message.key?('client')
    if json_message['check']['type'] != 'metric'
      return handleBufferCheckMessage(raw_message, json_message)
    elsif json_message['check']['type'] == 'metric'
      return handleBufferMetricMessage(raw_message, json_message)
    end
  end

  {
    'raw_message' => raw_message,
    'json_message' => json_message,
    'drop' => drop
  }
end

#handleBufferCheckMessage(raw_message, json_message) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/sensu/transport/snssqs.rb', line 168

def handleBufferCheckMessage(raw_message, json_message)
  drop = false
  json_message['check']['type'] = 'standard' unless json_message['check'].key?('type')

  # create initial client history
  unless @history.key? json_message['client']
    logger.debug("[transport-snssqs] creating event history for client #{json_message['client']}")
    @history[json_message['client']] = {}
  end
  # create initial check history
  unless @history[json_message['client']].key? json_message['check']['name']
    logger.debug("[transport-snssqs] creating event history for check #{json_message['check']['name']}")
    @history[json_message['client']][json_message['check']['name']] = { 'ok_count' => 0, 'last_event' => 0 }
  end

  # handle ok events
  if json_message['check']['status'] == 0 && json_message['check'].key?('aggregate') == false && json_message['check'].key?('ttl') == false && json_message['check'].key?('force_resolve') == false
    @history[json_message['client']][json_message['check']['name']]['ok_count'] += 1

    if @history[json_message['client']][json_message['check']['name']]['ok_count'] < @settings[:check_min_ok]
      # history ok_count is too low
      logger.debug("[transport-snssqs] sending event because history ok_count #{@history[json_message['client']][json_message['check']['name']]['ok_count']} is too low for #{json_message['check']['name']}")
      @history[json_message['client']][json_message['check']['name']]['last_event'] = Time.now.to_i
    else
      max_delay = @settings[:check_max_delay]
      if json_message['check']['name'] == 'keepalive' && json_message['check'].key?('thresholds')
        max_delay = json_message['check']['thresholds']['warning'] if json_message['check']['thresholds'].key?('warning')
      end
      if @history[json_message['client']][json_message['check']['name']]['last_event'] < (Time.now.to_i - max_delay)
        # history last_event is too old
        logger.debug("[transport-snssqs] sending event because last_event history #{Time.now.to_i - @history[json_message['client']][json_message['check']['name']]['last_event']} is too old for #{json_message['check']['name']}")
        @history[json_message['client']][json_message['check']['name']]['last_event'] = Time.now.to_i
      else
        # history last_event is recent
        logger.debug("[transport-snssqs] skipping event because last_event history #{Time.now.to_i - @history[json_message['client']][json_message['check']['name']]['last_event']} is recent for #{json_message['check']['name']}")
        # ignore whole message
        drop = true
      end
    end
  # handle error events
  else
    # reset history
    logger.debug("[transport-snssqs] reseting event history for #{json_message['check']['name']}")
    @history[json_message['client']][json_message['check']['name']]['ok_count'] = 0
    @history[json_message['client']][json_message['check']['name']]['last_event'] = 0
  end

  {
    'raw_message' => raw_message,
    'json_message' => json_message,
    'drop' => drop
  }
end

#handleBufferMetricMessage(raw_message, json_message) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/sensu/transport/snssqs.rb', line 222

def handleBufferMetricMessage(raw_message, json_message)
  drop = false

  if json_message['check']['status'] == 0 && json_message['check'].key?('force_resolve') == false

    @metrics_buffer += json_message['check']['output']
    if @metrics_buffer.length > 102_400 || @metrics_last_flush < ((Time.now.to_i - @settings[:metrics_max_delay]))
      json_message['check']['name'] = 'combined_metrics'
      json_message['check']['command'] = 'combined metrics by snssqs'
      json_message['check']['interval'] = @settings[:metrics_max_delay]
      json_message['check']['output'] = @metrics_buffer

      raw_message = json_message.to_json
      logger.info("[transport-snssqs] flushing metrics buffer #{@metrics_buffer.length}")
      @metrics_buffer = ''
      @metrics_last_flush = Time.now.to_i
    else
      # ignore whole message
      logger.debug("[transport-snssqs] storing output in metrics buffer #{@metrics_buffer.length}")
      drop = true
    end
  end
  
  if json_message['check']['status'] != 0
    drop = true
  end

  {
    'raw_message' => raw_message,
    'json_message' => json_message,
    'drop' => drop
  }
end

#publish(type, pipe, message, options = {}, &callback) ⇒ Object

publish publishes a message to the SNS topic.

The type, pipe, and options are transformed into SNS message attributes and included with the message.



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/sensu/transport/snssqs.rb', line 260

def publish(type, pipe, message, options = {}, &callback)
  result = handleBuffer(message)
  if result['drop']
    return
  else
    message = result['raw_message']
    json_message = result['json_message']
  end

  attributes = {
    TYPE_STR => str_attr(type),
    PIPE_STR => str_attr(pipe)
  }

  attributes['client'] = str_attr(json_message['client']) if json_message.key?('client')
  if json_message.key?('check')
    attributes['check_name'] = str_attr(json_message['check']['name']) if json_message['check'].key?('name')
    attributes['check_type'] = str_attr(json_message['check']['type']) if json_message['check'].key?('type')
    attributes['check_status'] = int_attr(json_message['check']['status']) if json_message['check'].key?('status')
    attributes['check_force_resolve'] = int_attr(json_message['check']['status']) if json_message['check'].key?('check_force_resolve')
  end

  options.each do |k, v|
    attributes[k.to_s] = str_attr(v.to_s)
  end
  EM.defer { send_message(message, attributes, &callback) }
end

#statsd_incr(stat) ⇒ Object



74
75
76
# File 'lib/sensu/transport/snssqs.rb', line 74

def statsd_incr(stat)
  @statsd.increment(stat, @statsd_sample_rate) unless @statsd.nil?
end

#statsd_time(stat) ⇒ Object



78
79
80
81
82
83
84
85
86
87
# File 'lib/sensu/transport/snssqs.rb', line 78

def statsd_time(stat)
  # always measure + run the block, but only if @statsd is set
  # do we actually report it.
  start = Time.now
  result = yield
  unless @statsd.nil?
    @statsd.timing(stat, ((Time.now - start) * 1000).round(5), @statsd_sample_rate)
  end
  result
end

#subscribe(type, pipe, funnel = nil, _options = {}, &callback) ⇒ Object

subscribe will begin “subscribing” to the consuming sqs queue.

This method is intended for use by the Sensu server; fanout subscriptions initiated by the Sensu client process are treated as a no-op.

What this really means is that we will start polling for messages from the SQS queue, and, depending on the message type, it will call the appropriate callback.

This assumes that the SQS Queue is consuming “Raw” messages from SNS.

“subscribing” means that the “callback” parameter will be called when there is a message for you to consume.

“funnel” and “type” parameters are completely ignored.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/sensu/transport/snssqs.rb', line 106

def subscribe(type, pipe, funnel = nil, _options = {}, &callback)
  if type == :fanout
    logger.debug("skipping unsupported fanout subscription type=#{type}, pipe=#{pipe}, funnel=#{funnel}")
    return
  end

  logger.info("subscribing to type=#{type}, pipe=#{pipe}, funnel=#{funnel}")

  if pipe == KEEPALIVES_STR
    @keepalives_callback = callback
  else
    @results_callback = callback
  end

  unless @subscribing
    do_all_the_time do
      EM::Iterator.new(receive_messages, 10).each do |msg, iter|
        statsd_time("sqs.#{@settings[:consuming_sqs_queue_url]}.process_timing") do
          if msg.message_attributes[PIPE_STR].string_value == KEEPALIVES_STR
            @keepalives_callback.call(msg, msg.body)
          else
            @results_callback.call(msg, msg.body)
          end
        end
        iter.next
      end
    end
    @subscribing = true
  end
end