Class: Fluent::SpectrumOut

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_spectrum.rb

Instance Method Summary collapse

Constructor Details

#initializeSpectrumOut

Returns a new instance of SpectrumOut.



18
19
20
21
22
23
# File 'lib/fluent/plugin/out_spectrum.rb', line 18

def initialize
  require 'rest-client'
  require 'json'
  require 'cgi' # verify we need --yes, we need it, to_utf8 could not used to create valid url and xml
  super
end

Instance Method Details

#alarms_resourceObject



70
71
72
# File 'lib/fluent/plugin/out_spectrum.rb', line 70

def alarms_resource
  RestClient::Resource.new(@alarms_url, :user => @user, :password => @pass, :open_timeout => 5, :timeout => (@interval * 3))
end

#configure(conf) ⇒ Object

This method is called before starting.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/out_spectrum.rb', line 39

def configure(conf)
  super
  # Read configuration for event_rename_rules and create a hash
  @event_rename_rules = []
  conf.elements.select { |element| element.name == 'event_rename_rules' }.each { |element|
    element.each_pair { |key_varbind, origin_event_keyname|
      element.has_key?(key_varbind) # to suppress unread configuration warning
      @event_rename_rules << { key_varbind: key_varbind, origin_event_keyname: origin_event_keyname }
      $log.info "Added event_rename_rules: #{@event_rename_rules.last}"
    }
  }


  # Read configuration for alarm_rename_rules and create a hash
  @alarm_rename_rules = []
  conf.elements.select { |element| element.name == 'alarm_rename_rules' }.each { |element|
    element.each_pair { |key_spectrum_alarm, origin_event_keyname|
      element.has_key?(key_spectrum_alarm) # to suppress unread configuration warning
      @alarm_rename_rules << { key_spectrum_alarm: key_spectrum_alarm, origin_event_keyname: origin_event_keyname }
      $log.info "Added alarm_rename_rules: #{@alarm_rename_rules.last}"
    }
  }

  
  # Setup URL Resource
  @alarms_url = 'http://' + @endpoint.to_s + '/spectrum/restful/alarms/'
  @events_url = 'http://' + @endpoint.to_s + '/spectrum/restful/events'
  def events_resource
    RestClient::Resource.new(@events_url, :user => @user, :password => @pass, :open_timeout => 5, :timeout => (@interval * 3))
  end

  def alarms_resource
    RestClient::Resource.new(@alarms_url, :user => @user, :password => @pass, :open_timeout => 5, :timeout => (@interval * 3))
  end


end

#emit(tag, es, chain) ⇒ Object

This method is called when an event reaches Fluentd. ‘es’ is a Fluent::EventStream object that includes multiple events. You can use ‘es.each {|time,record| … }’ to retrieve events. ‘chain’ is an object that manages transactions. Call ‘chain.next’ at appropriate points and rollback if it raises an exception.

NOTE! This method is called by Fluentd’s main thread so you should not write slow routine here. It causes Fluentd’s performance degression.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/fluent/plugin/out_spectrum.rb', line 91

def emit(tag, es, chain)
  chain.next
  es.each {|time,record|
    
    if (record["event"].has_key?(@alarm_ID_key) && record["event"].has_key?(@spectrum_key) ) 
      ######native spectrum alert ########################
      ######PUT alarm to update enriched fields
      if (record["event"][@spectrum_key] == @spectrum_value) 
        $log.info "The alert is from spectrum"
        # Create an empty hash
        alertUpdateHash=Hash.new
        # Parse thro the array hash that contains name value pairs for hash mapping and add new records to a new hash
        @alarm_rename_rules.each { |rule| 
          puts rule[:origin_event_keyname]
          alertUpdateHash[rule[:key_spectrum_alarm]]=record["event"][rule[:origin_event_keyname]]
        }
        # construct the alarms PUT uri for update triggerd alarm withe enriched fields
        @alarms_urlrest = @alarms_url + record["event"][@alarm_ID_key]
        @attr_count = 0
        alertUpdateHash.each do |attr, val| 
          if (val.nil? || val.empty?)
            next
          else
            if (@attr_count == 0)
              @alarms_urlrest = @alarms_urlrest + "?attr=" + attr + "&val=" + CGI.escape(val.to_s)
              # @alarms_urlrest = @alarms_urlrest + "?attr=" + attr + "&val=" + to_utf8(val.to_s)
              @attr_count +=1
            else
              @alarms_urlrest = @alarms_urlrest + "&attr=" + attr + "&val=" + CGI.escape(val.to_s)
              # @alarms_urlrest = @alarms_urlrest + "&attr=" + attr + "&val=" + to_utf8(val.to_s)
              @attr_count +=1
            end
          end
        end
        $log.info "Rest url for PUT alarms: " + @alarms_urlrest            
        
        begin 
          # alarmPutRes = alarms_resource.put @alarms_urlrest,:content_type => 'application/json'
          alarmPutRes = RestClient::Resource.new(@alarms_urlrest,@user,@pass).put(@alarms_urlrest,:content_type => 'application/json')
          $log.info alarmPutRes 
        end

      ######3rd party alert #######################
      ######Post an event and then trigger an alarm ######   
      else
        $log.info "The alert is from 3rd party"           
        # Create an empty hash
        alertNewHash=Hash.new
        # Parse thro the array hash that contains name value pairs for hash mapping and add new records to a new hash
        @event_rename_rules.each { |rule| 
          if(debug)
            $log.info rule[:key_varbind]+": "+ rule[:origin_event_keyname]
          end
          alertNewHash[rule[:key_varbind]]=record["event"][rule[:origin_event_keyname]]
        }
        # construct the xml
        @post_event_xml ="<?xml version=\"1.0\" encoding=\"UTF-8\"?>
        <rs:event-request throttlesize=\"10\"
          xmlns:rs=\"http://www.ca.com/spectrum/restful/schema/request\"
          xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"
          xsi:schemaLocation=\"http://www.ca.com/spectrum/restful/schema/request ../../../xsd/Request.xsd\">
          <rs:event>
            <rs:target-models>
             <rs:model mh= \"#{model_mh}\" />
            </rs:target-models>
         
           <!-- event ID -->
            <rs:event-type id=\"#{event_type_id}\"/>
         
            <!-- attributes/varbinds -->"
        alertNewHash.each do |attr, val| 
          if (val.nil? || val.empty?)
            @post_event_xml += "\n <rs:varbind id=\""+ attr + "\"></rs:varbind>"
          else
            @post_event_xml += "\n <rs:varbind id=\""+ attr + "\">"+ CGI.escapeHTML(val) +"</rs:varbind>"
          end
        end
        @post_event_xml += "
                </rs:event>
              </rs:event-request>"
        @triggered_event_id = ''
        if(debug)
          $log.info "Rest url for post events: " + @events_url               
          $log.info "xml: " +@post_event_xml 
        end   
        begin        
          # eventPostRes = RestClient::Resource.new(@events_url,@user,@pass).post(@post_event_xml,:content_type => 'application/xml')
          eventPostRes = events_resource.post @post_event_xml,:content_type => 'application/xml',:accept => 'application/json'
          $log.info eventPostRes
          eventPostResBody = JSON.parse(eventPostRes.body)
          @triggered_event_id = eventPostResBody['ns1.event-response-list']['ns1.event-response']['@id']
          # $log.info "event id is: " + @triggered_event_id
        end

      end #end of checking alerts is from 3rd party or spectrum
      
    else # if don't have @alarm_ID_key and @spectrum_key
      $log.info "The alert don't have '#{@alarm_ID_key}' and '#{@spectrum_key}' "
      $log.info record["event"]
    end 

  } # end of loop for each record
end

#events_resourceObject



66
67
68
# File 'lib/fluent/plugin/out_spectrum.rb', line 66

def events_resource
  RestClient::Resource.new(@events_url, :user => @user, :password => @pass, :open_timeout => 5, :timeout => (@interval * 3))
end

#parse_rename_rule(rule) ⇒ Object



32
33
34
35
36
# File 'lib/fluent/plugin/out_spectrum.rb', line 32

def parse_rename_rule rule
  if rule.match /^([^\s]+)\s+(.+)$/
    return $~.captures
  end
end

#shutdownObject

This method is called when shutting down.



81
82
83
# File 'lib/fluent/plugin/out_spectrum.rb', line 81

def shutdown
  super
end

#startObject

This method is called when starting.



77
78
79
# File 'lib/fluent/plugin/out_spectrum.rb', line 77

def start
  super
end

#to_utf8(str) ⇒ Object

function to UTF8 encode



26
27
28
29
30
# File 'lib/fluent/plugin/out_spectrum.rb', line 26

def to_utf8(str)
  str = str.force_encoding('UTF-8')
  return str if str.valid_encoding?
  str.encode("UTF-8", 'binary', invalid: :replace, undef: :replace, replace: '')
end