Class: Fluent::SpectrumInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_spectrum.rb

Defined Under Namespace

Classes: TimerWatcher

Constant Summary collapse

INTERVAL_MIN =

Configurations

10

Instance Method Summary collapse

Constructor Details

#initializeSpectrumInput

Returns a new instance of SpectrumInput.



53
54
55
56
57
58
59
# File 'lib/fluent/plugin/in_spectrum.rb', line 53

def initialize
  require 'rest-client'
  require 'json'
  require 'highwatermark'
  require 'yaml'
  super
end

Instance Method Details

#configure(conf) ⇒ Object

def initialize



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/fluent/plugin/in_spectrum.rb', line 61

def configure(conf)
  super 
  @conf = conf
  # Verify configs
  # Stop if required fields are not set
  unless @endpoint && @username && @password
    raise ConfigError, "Spectrum :: ConfigError 'endpoint' and 'username' and 'password' must be all specified."
  end
  # Enforce min interval
  if @interval.to_i < INTERVAL_MIN
    raise ConfigError, "Spectrum :: ConfigError 'interval' must be #{INTERVAL_MIN} or over."
  end
  # Warn about optional state file
  unless @state_type == "file" || @state_type =="redis"
    $log.warn "Spectrum :: 'state_type' is not set to file or redis"
    $log.warn "Spectrum :: state file or redis are recommended to save the last known good timestamp to resume event consuming"
  end

  @highwatermark_parameters={
    "state_tag" => @state_tag,     
    "state_type" => @state_type,
    "state_file" => @state_file,
    "redis_host" => @redis_host,
    "redis_port" => @redis_port      
  }
  $log.info "highwatermark_parameters: #{@highwatermark_parameters}"

  # default setting for @spectrum_access_code
  @spectrum_access_code={
    "0x11f9c" => "ALARM_ID",
    "0x11f4e" => "CREATION_DATE",
    "0x11f56" => "SEVERITY",
    "0x12b4c" => "ALARM_TITLE",
    "0x1006e" => "HOSTNAME",
    "0x12d7f" => "IP_ADDRESS",
    "0x1296e" => "ORIGINATING_EVENT_ATTR",
    "0x10000" => "MODEL_STRING",  
    "0x11f4d" => "ACKNOWLEDGED",
    "0x11f4f" => "ALARM_STATUS",
    "0x11fc5" => "OCCURRENCES",
    "0x11f57" => "TROUBLE_SHOOTER",
    "0x11f9b" => "USER_CLEARABLE",
    "0x12022" => "TROUBLE_TICKET_ID",
    "0x12942" => "PERSISTENT",
    "0x12adb" => "GC_NAME",
    "0x57f0105" => "CUSTOM_PROJECT",
    "0x11f51" => "CLEARED_BY_USER_NAME",
    "0x11f52" => "EVENT_ID_LIST",
    "0x11f53" => "MODEL_HANDLE",
    "0x11f54" => "PRIMARY_ALARM",
    "0x11fc4" => "ALARM_SOURCE",
    "0x11fc6" => "TROUBLE_SHOOTER_MH",
    "0x12a6c" => "TROUBLE_SHOOTER_EMAIL",
    "0x1290d" => "IMPACT_SEVERITY",
    "0x1290e" => "IMPACT_SCOPE",
    "0x1298a" => "IMPACT_TYPE_LIST",
    "0x12948" => "DIAGNOSIS_LOG",
    "0x129aa" => "MODEL_ID",
    "0x129ab" => "MODEL_TYPE_ID",
    "0x129af" => "CLEAR_DATE",
    "0x12a04" => "SYMPTOM_LIST_ATTR",
    "0x12a6f" => "EVENT_SYMPTOM_LIST_ATTR",
    "0x12a05" => "CAUSE_LIST_ATTR",
    "0x12a06" => "SYMPTOM_COUNT_ATTR",
    "0x12a70" => "EVENT_SYMPTOM_COUNT_ATTR",
    "0x12a07" => "CAUSE_COUNT_ATTR",
    "0x12a63" => "WEB_CONTEXT_URL",
    "0x12a6b" => "COMBINED_IMPACT_TYPE_LIST",
    "0x11f50" => "CAUSE_CODE",
    "0x10009" => "SECURITY_STRING"
  }

  # Create XML chunk for attributes we care about
  @attr_of_interest=""
  if(@attributes.upcase === "__ALL__")
    $log.info "all attributes"
    @spectrum_access_code.each do |key, value|
      $log.info "key: #{key},  value: #{value}"
      @attr_of_interest += " <rs:requested-attribute id=\"#{key}\"/>"
    end
  else
    $log.info "selected attributes"
    @attributes.split(",").each {|attr|         
      key=""
      value=""
      # if it's hex code
      if @spectrum_access_code.has_key?(attr.strip)
        key = attr.strip
        value = @spectrum_access_code.fetch(key)
      # if it's the name
      elsif @spectrum_access_code.has_value?(attr.strip.upcase)
        value = attr.strip.upcase
        key = @spectrum_access_code.key(value)
      # if it's invalid input, not the hex code or name in the map
      else 
        raise ConfigError, "Spectrum :: ConfigError attribute '#{attr}' is not in the hash map"
      end
      $log.info "key: #{key},  value: #{value}"
      @attr_of_interest += " <rs:requested-attribute id=\"#{key}\"/>"
    }
  end      

  # URL Resource
  def resource
    @url = 'http://' + @endpoint.to_s + '/spectrum/restful/alarms'
    RestClient::Resource.new(@url, :user => @username, :password => @password, :open_timeout => 5, :timeout => (@interval * 3))
  end
end

#on_timerObject

def run



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/fluent/plugin/in_spectrum.rb', line 192

def on_timer
  if not @stop_flag
    pollingStart = Engine.now.to_i
    if @highwatermark.last_records(@state_tag)
      alertStartTime = @highwatermark.last_records(@state_tag)
      $log.info "got hwm form state file: #{alertStartTime.to_i}"
    else
      alertStartTime = (pollingStart.to_i - @interval.to_i)
      $log.info "no hwm, got new alert start time: #{alertStartTime.to_i}"
    end
    pollingEnd = ''
    pollingDuration = ''
    # Format XML for spectrum post
    @xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>
    <rs:alarm-request throttlesize=\"#{select_limit}\"
    xmlns:rs=\"http://www.ca.com/spectrum/restful/schema/request\"
    xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"
    xsi:schemaLocation=\"http://www.ca.com/spectrum/restful/schema/request ../../../xsd/Request.xsd \">
    <rs:attribute-filter>
      <search-criteria xmlns=\"http://www.ca.com/spectrum/restful/schema/filter\">
      <filtered-models>
        <greater-than>
          <attribute id=\"0x11f4e\">
            <value> #{alertStartTime} </value>
          </attribute>
        </greater-than>
      </filtered-models>
      </search-criteria>
    </rs:attribute-filter>
    #{@attr_of_interest}
    </rs:alarm-request>"

    # Post to Spectrum and parse results
    begin
      res=resource.post @xml,:content_type => 'application/xml',:accept => 'application/json'
      body = JSON.parse(res.body)
      pollingEnd = Time.parse(res.headers[:date]).to_i
      pollingDuration = Engine.now.to_i - pollingStart
    end  

    # Processing for multiple alerts returned
    if body['ns1.alarm-response-list']['@total-alarms'].to_i > 1
      $log.info "Spectrum :: returned #{body['ns1.alarm-response-list']['@total-alarms'].to_i} alarms for period < #{alertStartTime.to_i} took #{pollingDuration.to_i} seconds, ended at #{pollingEnd}"
      # iterate through each alarm
      body['ns1.alarm-response-list']['ns1.alarm-responses']['ns1.alarm'].each do |alarm|
        # Create initial structure
        record_hash = Hash.new # temp hash to hold attributes of alarm
        raw_array = Array.new # temp hash to hold attributes of alarm for raw
        record_hash['event_type'] = @tag.to_s
        record_hash['intermediary_source'] = @endpoint.to_s
        record_hash['receive_time_input'] = pollingEnd.to_s
        # iterate though alarm attributes
        alarm['ns1.attribute'].each do |attribute|
          key,value = parseAttributes(attribute)
          record_hash[key] = value
          if @include_raw.to_s == "true"
            raw_array << { "#{key}" => "#{value}" }
          end
        end
        # append raw object
        if @include_raw.to_s == "true"  
          record_hash[:raw] = raw_array
        end
        Engine.emit(@tag, record_hash['CREATION_DATE'].to_i,record_hash)
      end
    # Processing for single alarm returned  
    elsif body['ns1.alarm-response-list']['@total-alarms'].to_i == 1
      $log.info "Spectrum :: returned #{body['ns1.alarm-response-list']['@total-alarms'].to_i} alarms for period < #{alertStartTime.to_i} took #{pollingDuration.to_i} seconds, ended at #{pollingEnd}"
      # Create initial structure
      record_hash = Hash.new # temp hash to hold attributes of alarm
      raw_array = Array.new # temp hash to hold attributes of alarm for raw
      record_hash['event_type'] = @tag.to_s
      record_hash['intermediary_source'] = @endpoint.to_s
      record_hash['receive_time_input'] = pollingEnd.to_s
      # iterate though alarm attributes and add to temp hash  
      body['ns1.alarm-response-list']['ns1.alarm-responses']['ns1.alarm']['ns1.attribute'].each do |attribute|
        key,value = parseAttributes(attribute)
        record_hash[key] = value
        if @include_raw.to_s == "true"
          raw_array << { "#{key}" => "#{value}" }
        end
      end
      # append raw object
      if @include_raw.to_s == "true"  
        record_hash[:raw] = raw_array
      end
      Engine.emit(@tag, record_hash['CREATION_DATE'].to_i,record_hash)
    # No alarms returned
    else
      $log.info "Spectrum :: returned #{body['ns1.alarm-response-list']['@total-alarms'].to_i} alarms for period < #{alertStartTime.to_i} took #{pollingDuration.to_i} seconds, ended at #{pollingEnd}"
    end
    @highwatermark.update_records(pollingEnd,@state_tag)
  end
end

#parseAttributes(alarmAttribute) ⇒ Object



47
48
49
50
51
# File 'lib/fluent/plugin/in_spectrum.rb', line 47

def parseAttributes(alarmAttribute)
  key = @spectrum_access_code[alarmAttribute['@id'].to_s].to_s
  value = ((to_utf8(alarmAttribute['$'].to_s)).strip).gsub(/\r?\n/, " ")
  return key,value
end

#resourceObject

URL Resource



164
165
166
167
# File 'lib/fluent/plugin/in_spectrum.rb', line 164

def resource
  @url = 'http://' + @endpoint.to_s + '/spectrum/restful/alarms'
  RestClient::Resource.new(@url, :user => @username, :password => @password, :open_timeout => 5, :timeout => (@interval * 3))
end

#runObject

def shutdown



185
186
187
188
189
190
# File 'lib/fluent/plugin/in_spectrum.rb', line 185

def run
  @loop.run
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
end

#shutdownObject

def start



178
179
180
181
182
183
# File 'lib/fluent/plugin/in_spectrum.rb', line 178

def shutdown
  #@loop.watchers.each {|w| w.detach}
  @stop_flag = true
  @loop.stop
  @thread.join
end

#startObject

def configure



170
171
172
173
174
175
176
# File 'lib/fluent/plugin/in_spectrum.rb', line 170

def start
  @stop_flag = false
  @highwatermark = Highwatermark::HighWaterMark.new(@highwatermark_parameters)
  @loop = Coolio::Loop.new
  @loop.attach(TimerWatcher.new(@interval, true, &method(:on_timer)))
  @thread = Thread.new(&method(:run))
end

#to_utf8(str) ⇒ Object

function to UTF8 encode



41
42
43
44
45
# File 'lib/fluent/plugin/in_spectrum.rb', line 41

def to_utf8(str)
  str = str.force_encoding('UTF-8')
  return str if str.valid_encoding?
  str.encode("UTF-8", 'binary', invalid: :replace, undef: :replace, replace: '')
end