Class: Fluent::SpectrumInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_spectrum.rb

Defined Under Namespace

Classes: MemoryStateStore, StateStore, TimerWatcher

Constant Summary collapse

INTERVAL_MIN =

Configurations

10

Instance Method Summary collapse

Constructor Details

#initializeSpectrumInput

Returns a new instance of SpectrumInput.



85
86
87
88
89
# File 'lib/fluent/plugin/in_spectrum.rb', line 85

def initialize
  require 'rest-client'
  require 'json'
  super
end

Instance Method Details

#configure(conf) ⇒ Object

def initialize



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/fluent/plugin/in_spectrum.rb', line 91

def configure(conf)
  super 
  @conf = conf
  # Verify configs
  # Stop if required fields are not set
  unless @endpoint && @username && @password
    raise ConfigError, "Spectrum :: ConfigError 'endpoint' and 'username' and 'password' must be all specified."
  end
  # Enforce min interval
  if @interval.to_i < INTERVAL_MIN
    raise ConfigError, "Spectrum :: ConfigError 'interval' must be #{INTERVAL_MIN} or over."
  end
  # Warn about optional state file
  unless @state_file
    $log.warn "Spectrum :: 'state_file PATH' parameter is not set to a valid source."
    $log.warn "Spectrum :: this parameter is highly recommended to save the last known good timestamp to resume event consuming"
  end
  # map of Spectrum attribute codes to names
  @spectrum_access_code={
    "0x11f9c" => "ALARM_ID",
    "0x11f4e" => "CREATION_DATE",
    "0x11f56" => "SEVERITY",
    "0x12b4c" => "ALARM_TITLE",
    "0x1006e" => "HOSTNAME",
    "0x12d7f" => "IP_ADDRESS",
    "0x1296e" => "ORIGINATING_EVENT_ATTR",
    "0x10000" => "MODEL_STRING",  
    "0x11f4d" => "ACKNOWLEDGED",
    "0x11f4f" => "ALARM_STATUS",
    "0x11fc5" => "OCCURRENCES",
    "0x11f57" => "TROUBLE_SHOOTER",
    "0x11f9b" => "USER_CLEARABLE",
    "0x12022" => "TROUBLE_TICKET_ID",
    "0x12942" => "PERSISTENT",
    "0x12adb" => "GC_NAME",
    "0x57f0105" => "Custom_Project",
    #{}"0x11f51" => "CLEARED_BY_USER_NAME",
    #{}"0x11f52" => "EVENT_ID_LIST",
    #{}"0x11f53" => "MODEL_HANDLE",
    #{}"0x11f54" => "PRIMARY_ALARM",
    #{}"0x11fc4" => "ALARM_SOURCE",
    #{}"0x11fc6" => "TROUBLE_SHOOTER_MH",
    #{}"0x12a6c" => "TROUBLE_SHOOTER_EMAIL",
    #{}"0x1290d" => "IMPACT_SEVERITY",
    #{}"0x1290e" => "IMPACT_SCOPE",
    #{}"0x1298a" => "IMPACT_TYPE_LIST",
    #{}"0x12948" => "DIAGNOSIS_LOG",
    #{}"0x129aa" => "MODEL_ID",
    #{}"0x129ab" => "MODEL_TYPE_ID",
    #{}"0x129af" => "CLEAR_DATE",
    #{}"0x12a04" => "SYMPTOM_LIST_ATTR",
    #{}"0x12a6f" => "EVENT_SYMPTOM_LIST_ATTR",
    #{}"0x12a05" => "CAUSE_LIST_ATTR",
    #{}"0x12a06" => "SYMPTOM_COUNT_ATTR",
    #{}"0x12a70" => "EVENT_SYMPTOM_COUNT_ATTR",
    #{}"0x12a07" => "CAUSE_COUNT_ATTR",
    #{}"0x12a63" => "WEB_CONTEXT_URL",
    #{}"0x12a6b" => "COMBINED_IMPACT_TYPE_LIST",
    #{}"0x11f50" => "CAUSE_CODE",
    #{}"0x10009" => "SECURITY_STRING"
  }
  # Create XML chunk for attributes we care about
  @attr_of_interest=""
  @spectrum_access_code.each do |key, array|
    @attr_of_interest += " <rs:requested-attribute id=\"#{key}\"/>"
  end
  # URL Resource
  def resource
    @url = 'http://' + @endpoint.to_s + '/spectrum/restful/alarms'
    RestClient::Resource.new(@url, :user => @username, :password => @password, :open_timeout => 5, :timeout => (@interval * 3))
  end
end

#on_timerObject

def run



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/fluent/plugin/in_spectrum.rb', line 186

def on_timer
  if not @stop_flag
    pollingStart = Engine.now.to_i
    if @state_store.last_records.has_key?("spectrum") 
      alertStartTime = @state_store.last_records['spectrum']
    else
      alertStartTime = (pollingStart.to_i - @interval.to_i)
    end
    pollingEnd = ''
    pollingDuration = ''
    # Format XML for spectrum post
    @xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>
    <rs:alarm-request throttlesize=\"#{select_limit}\"
    xmlns:rs=\"http://www.ca.com/spectrum/restful/schema/request\"
    xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"
    xsi:schemaLocation=\"http://www.ca.com/spectrum/restful/schema/request ../../../xsd/Request.xsd \">
    <rs:attribute-filter>
      <search-criteria xmlns=\"http://www.ca.com/spectrum/restful/schema/filter\">
      <filtered-models>
        <greater-than>
          <attribute id=\"0x11f4e\">
            <value> #{alertStartTime} </value>
          </attribute>
        </greater-than>
      </filtered-models>
      </search-criteria>
    </rs:attribute-filter>
    #{@attr_of_interest}
    </rs:alarm-request>"

    # Post to Spectrum and parse results
    begin
      res=resource.post @xml,:content_type => 'application/xml',:accept => 'application/json'
      body = JSON.parse(res.body)
      pollingEnd = Time.parse(res.headers[:date]).to_i
      @state_store.last_records['spectrum'] = pollingEnd
      pollingDuration = Engine.now.to_i - pollingStart
    end  

    # Processing for multiple alerts returned
    if body['ns1.alarm-response-list']['@total-alarms'].to_i > 1
      $log.info "Spectrum :: returned #{body['ns1.alarm-response-list']['@total-alarms'].to_i} alarms for period < #{alertStartTime.to_i} took #{pollingDuration.to_i} seconds, ended at #{pollingEnd}"
      # iterate through each alarm
      body['ns1.alarm-response-list']['ns1.alarm-responses']['ns1.alarm'].each do |alarm|
        # Create initial structure
        record_hash = Hash.new # temp hash to hold attributes of alarm
        raw_array = Array.new # temp hash to hold attributes of alarm for raw
        record_hash['event_type'] = @tag.to_s
        record_hash['intermediary_source'] = @endpoint.to_s
        record_hash['receive_time_input'] = pollingEnd.to_s
        # iterate though alarm attributes
        alarm['ns1.attribute'].each do |attribute|
          key,value = parseAttributes(attribute)
          record_hash[key] = value
          if @include_raw.to_s == "true"
            raw_array << { "#{key}" => "#{value}" }
          end
        end
        # append raw object
        if @include_raw.to_s == "true"  
          record_hash[:raw] = raw_array
        end
        Engine.emit(@tag, record_hash['CREATION_DATE'].to_i,record_hash)
      end
    # Processing for single alarm returned  
    elsif body['ns1.alarm-response-list']['@total-alarms'].to_i == 1
      $log.info "Spectrum :: returned #{body['ns1.alarm-response-list']['@total-alarms'].to_i} alarms for period < #{alertStartTime.to_i} took #{pollingDuration.to_i} seconds, ended at #{pollingEnd}"
      # Create initial structure
      record_hash = Hash.new # temp hash to hold attributes of alarm
      raw_array = Array.new # temp hash to hold attributes of alarm for raw
      record_hash['event_type'] = @tag.to_s
      record_hash['intermediary_source'] = @endpoint.to_s
      record_hash['receive_time_input'] = pollingEnd.to_s
      # iterate though alarm attributes and add to temp hash  
      body['ns1.alarm-response-list']['ns1.alarm-responses']['ns1.alarm']['ns1.attribute'].each do |attribute|
        key,value = parseAttributes(attribute)
        record_hash[key] = value
        if @include_raw.to_s == "true"
          raw_array << { "#{key}" => "#{value}" }
        end
      end
      # append raw object
      if @include_raw.to_s == "true"  
        record_hash[:raw] = raw_array
      end
      Engine.emit(@tag, record_hash['CREATION_DATE'].to_i,record_hash)
    # No alarms returned
    else
      $log.info "Spectrum :: returned #{body['ns1.alarm-response-list']['@total-alarms'].to_i} alarms for period < #{alertStartTime.to_i} took #{pollingDuration.to_i} seconds, ended at #{pollingEnd}"
    end
    @state_store.update!  
  end
end

#parseAttributes(alarmAttribute) ⇒ Object



79
80
81
82
83
# File 'lib/fluent/plugin/in_spectrum.rb', line 79

def parseAttributes(alarmAttribute)
  key = @spectrum_access_code[alarmAttribute['@id'].to_s].to_s
  value = ((to_utf8(alarmAttribute['$'].to_s)).strip).gsub(/\r?\n/, " ")
  return key,value
end

#resourceObject

URL Resource



158
159
160
161
# File 'lib/fluent/plugin/in_spectrum.rb', line 158

def resource
  @url = 'http://' + @endpoint.to_s + '/spectrum/restful/alarms'
  RestClient::Resource.new(@url, :user => @username, :password => @password, :open_timeout => 5, :timeout => (@interval * 3))
end

#runObject

def shutdown



179
180
181
182
183
184
# File 'lib/fluent/plugin/in_spectrum.rb', line 179

def run
  @loop.run
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
end

#shutdownObject

def start



172
173
174
175
176
177
# File 'lib/fluent/plugin/in_spectrum.rb', line 172

def shutdown
  #@loop.watchers.each {|w| w.detach}
  @stop_flag = true
  @loop.stop
  @thread.join
end

#startObject

def configure



164
165
166
167
168
169
170
# File 'lib/fluent/plugin/in_spectrum.rb', line 164

def start
  @stop_flag = false
  @state_store = @state_file.nil? ? MemoryStateStore.new : StateStore.new(@state_file)
  @loop = Coolio::Loop.new
  @loop.attach(TimerWatcher.new(@interval, true, &method(:on_timer)))
  @thread = Thread.new(&method(:run))
end

#to_utf8(str) ⇒ Object

function to UTF8 encode



73
74
75
76
77
# File 'lib/fluent/plugin/in_spectrum.rb', line 73

def to_utf8(str)
  str = str.force_encoding('UTF-8')
  return str if str.valid_encoding?
  str.encode("UTF-8", 'binary', invalid: :replace, undef: :replace, replace: '')
end