Class: Sensu::Transport::SNSSQS
- Inherits:
-
Base
- Object
- Base
- Sensu::Transport::SNSSQS
- Defined in:
- lib/sensu/transport/snssqs.rb
Constant Summary collapse
- STRING_STR =
'String'.freeze
- NUMBER_STR =
'Number'.freeze
- KEEPALIVES_STR =
'keepalives'.freeze
- PIPE_STR =
'pipe'.freeze
- TYPE_STR =
'type'.freeze
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
-
#acknowledge(info, &callback) ⇒ Object
acknowledge will delete the given message from the SQS queue.
- #connect(settings) ⇒ Object
- #connected? ⇒ Boolean
- #handleBuffer(raw_message) ⇒ Object
- #handleBufferCheckMessage(raw_message, json_message) ⇒ Object
- #handleBufferMetricMessage(raw_message, json_message) ⇒ Object
-
#initialize ⇒ SNSSQS
constructor
A new instance of SNSSQS.
-
#publish(type, pipe, message, options = {}, &callback) ⇒ Object
publish publishes a message to the SNS topic.
- #statsd_incr(stat) ⇒ Object
- #statsd_time(stat) ⇒ Object
-
#subscribe(type, pipe, funnel = nil, _options = {}, &callback) ⇒ Object
subscribe will begin “subscribing” to the consuming sqs queue.
Constructor Details
#initialize ⇒ SNSSQS
Returns a new instance of SNSSQS.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/sensu/transport/snssqs.rb', line 19 def initialize @connected = false @subscribing = false @history = {} @metrics_buffer = '' @metrics_last_flush = 0 # as of sensu 0.23.0 we need to call succeed when we have # successfully connected to SQS. # # we already have our own logic to maintain the connection to # SQS, so we can always say we're connected. # # See: # https://github.com/sensu/sensu/blob/cdc25b29169ef2dcd2e056416eab0e83dbe000bb/CHANGELOG.md#0230---2016-04-04 succeed end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
11 12 13 |
# File 'lib/sensu/transport/snssqs.rb', line 11 def logger @logger end |
Instance Method Details
#acknowledge(info, &callback) ⇒ Object
acknowledge will delete the given message from the SQS queue.
138 139 140 141 142 143 144 145 146 147 |
# File 'lib/sensu/transport/snssqs.rb', line 138 def acknowledge(info, &callback) EM.defer do @sqs.( queue_url: @settings[:consuming_sqs_queue_url], receipt_handle: info.receipt_handle ) statsd_incr("sqs.#{@settings[:consuming_sqs_queue_url]}.message.deleted") yield(info) if callback end end |
#connect(settings) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/sensu/transport/snssqs.rb', line 41 def connect(settings) @settings = settings @connected = true @results_callback = proc {} @keepalives_callback = proc {} # Sensu Windows install does not include a valid cert bundle for AWS Aws.use_bundled_cert! if Gem.win_platform? aws_client_settings = { region: @settings[:region] } unless @settings[:access_key_id].nil? aws_client_settings[:access_key_id] = @settings[:access_key_id] aws_client_settings[:secret_access_key] = @settings[:secret_access_key] end @sqs = Aws::SQS::Client.new aws_client_settings @sns = Aws::SNS::Client.new aws_client_settings # connect to statsd, if necessary @statsd = nil if !@settings[:statsd_addr].nil? && @settings[:statsd_addr] != '' pieces = @settings[:statsd_addr].split(':') @statsd = Statsd.new(pieces[0], pieces[1].to_i).tap do |sd| sd.namespace = @settings[:statsd_namespace] end @statsd_sample_rate = @settings[:statsd_sample_rate].to_f end # setup custom buffer @settings[:buffer_messages] = @settings.fetch(:buffer_messages, true) @settings[:check_min_ok] = @settings.fetch(:check_min_ok, 10) @settings[:check_max_delay] = @settings.fetch(:check_max_delay, 1800) @settings[:metrics_max_size] = @settings.fetch(:metrics_max_size, 102_400) @settings[:metrics_max_delay] = @settings.fetch(:metrics_max_delay, 900) end |
#connected? ⇒ Boolean
37 38 39 |
# File 'lib/sensu/transport/snssqs.rb', line 37 def connected? @connected end |
#handleBuffer(raw_message) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/sensu/transport/snssqs.rb', line 149 def handleBuffer() = ::JSON.parse drop = false if @settings[:buffer_messages] && .key?('check') && .key?('client') if ['check']['type'] != 'metric' return handleBufferCheckMessage(, ) elsif ['check']['type'] == 'metric' return handleBufferMetricMessage(, ) end end { 'raw_message' => , 'json_message' => , 'drop' => drop } end |
#handleBufferCheckMessage(raw_message, json_message) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/sensu/transport/snssqs.rb', line 168 def handleBufferCheckMessage(, ) drop = false ['check']['type'] = 'standard' unless ['check'].key?('type') # create initial client history unless @history.key? ['client'] logger.debug("[transport-snssqs] creating event history for client #{['client']}") @history[['client']] = {} end # create initial check history unless @history[['client']].key? ['check']['name'] logger.debug("[transport-snssqs] creating event history for check #{['check']['name']}") @history[['client']][['check']['name']] = { 'ok_count' => 0, 'last_event' => 0 } end # handle ok events if ['check']['status'] == 0 && ['check'].key?('aggregate') == false && ['check'].key?('ttl') == false && ['check'].key?('force_resolve') == false @history[['client']][['check']['name']]['ok_count'] += 1 if @history[['client']][['check']['name']]['ok_count'] < @settings[:check_min_ok] # history ok_count is too low logger.debug("[transport-snssqs] sending event because history ok_count #{@history[['client']][['check']['name']]['ok_count']} is too low for #{['check']['name']}") @history[['client']][['check']['name']]['last_event'] = Time.now.to_i else max_delay = @settings[:check_max_delay] if ['check']['name'] == 'keepalive' && ['check'].key?('thresholds') max_delay = ['check']['thresholds']['warning'] if ['check']['thresholds'].key?('warning') end if @history[['client']][['check']['name']]['last_event'] < (Time.now.to_i - max_delay) # history last_event is too old logger.debug("[transport-snssqs] sending event because last_event history #{Time.now.to_i - @history[['client']][['check']['name']]['last_event']} is too old for #{['check']['name']}") @history[['client']][['check']['name']]['last_event'] = Time.now.to_i else # history last_event is recent logger.debug("[transport-snssqs] skipping event because last_event history #{Time.now.to_i - @history[['client']][['check']['name']]['last_event']} is recent for #{['check']['name']}") # ignore whole message drop = true end end # handle error events else # reset history logger.debug("[transport-snssqs] reseting event history for #{['check']['name']}") @history[['client']][['check']['name']]['ok_count'] = 0 @history[['client']][['check']['name']]['last_event'] = 0 end { 'raw_message' => , 'json_message' => , 'drop' => drop } end |
#handleBufferMetricMessage(raw_message, json_message) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/sensu/transport/snssqs.rb', line 222 def handleBufferMetricMessage(, ) drop = false if ['check']['status'] == 0 && ['check'].key?('force_resolve') == false @metrics_buffer += ['check']['output'] if @metrics_buffer.length > 102_400 || @metrics_last_flush < ((Time.now.to_i - @settings[:metrics_max_delay])) ['check']['name'] = 'combined_metrics' ['check']['command'] = 'combined metrics by snssqs' ['check']['interval'] = @settings[:metrics_max_delay] ['check']['output'] = @metrics_buffer = .to_json logger.info("[transport-snssqs] flushing metrics buffer #{@metrics_buffer.length}") @metrics_buffer = '' @metrics_last_flush = Time.now.to_i else # ignore whole message logger.debug("[transport-snssqs] storing output in metrics buffer #{@metrics_buffer.length}") drop = true end end if ['check']['status'] != 0 drop = true end { 'raw_message' => , 'json_message' => , 'drop' => drop } end |
#publish(type, pipe, message, options = {}, &callback) ⇒ Object
publish publishes a message to the SNS topic.
The type, pipe, and options are transformed into SNS message attributes and included with the message.
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/sensu/transport/snssqs.rb', line 260 def publish(type, pipe, , = {}, &callback) result = handleBuffer() if result['drop'] return else = result['raw_message'] = result['json_message'] end attributes = { TYPE_STR => str_attr(type), PIPE_STR => str_attr(pipe) } attributes['client'] = str_attr(['client']) if .key?('client') if .key?('check') attributes['check_name'] = str_attr(['check']['name']) if ['check'].key?('name') attributes['check_type'] = str_attr(['check']['type']) if ['check'].key?('type') attributes['check_status'] = int_attr(['check']['status']) if ['check'].key?('status') attributes['check_force_resolve'] = int_attr(['check']['status']) if ['check'].key?('check_force_resolve') end .each do |k, v| attributes[k.to_s] = str_attr(v.to_s) end EM.defer { (, attributes, &callback) } end |
#statsd_incr(stat) ⇒ Object
74 75 76 |
# File 'lib/sensu/transport/snssqs.rb', line 74 def statsd_incr(stat) @statsd.increment(stat, @statsd_sample_rate) unless @statsd.nil? end |
#statsd_time(stat) ⇒ Object
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/sensu/transport/snssqs.rb', line 78 def statsd_time(stat) # always measure + run the block, but only if @statsd is set # do we actually report it. start = Time.now result = yield unless @statsd.nil? @statsd.timing(stat, ((Time.now - start) * 1000).round(5), @statsd_sample_rate) end result end |
#subscribe(type, pipe, funnel = nil, _options = {}, &callback) ⇒ Object
subscribe will begin “subscribing” to the consuming sqs queue.
This method is intended for use by the Sensu server; fanout subscriptions initiated by the Sensu client process are treated as a no-op.
What this really means is that we will start polling for messages from the SQS queue, and, depending on the message type, it will call the appropriate callback.
This assumes that the SQS Queue is consuming “Raw” messages from SNS.
“subscribing” means that the “callback” parameter will be called when there is a message for you to consume.
“funnel” and “type” parameters are completely ignored.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/sensu/transport/snssqs.rb', line 106 def subscribe(type, pipe, funnel = nil, = {}, &callback) if type == :fanout logger.debug("skipping unsupported fanout subscription type=#{type}, pipe=#{pipe}, funnel=#{funnel}") return end logger.info("subscribing to type=#{type}, pipe=#{pipe}, funnel=#{funnel}") if pipe == KEEPALIVES_STR @keepalives_callback = callback else @results_callback = callback end unless @subscribing do_all_the_time do EM::Iterator.new(, 10).each do |msg, iter| statsd_time("sqs.#{@settings[:consuming_sqs_queue_url]}.process_timing") do if msg.[PIPE_STR].string_value == KEEPALIVES_STR @keepalives_callback.call(msg, msg.body) else @results_callback.call(msg, msg.body) end end iter.next end end @subscribing = true end end |