Class: Fluent::SpectrumOut
- Inherits:
-
Output
- Object
- Output
- Fluent::SpectrumOut
- Defined in:
- lib/fluent/plugin/out_spectrum.rb
Instance Method Summary collapse
- #alarms_resource ⇒ Object
-
#configure(conf) ⇒ Object
This method is called before starting.
-
#emit(tag, es, chain) ⇒ Object
This method is called when an event reaches Fluentd.
- #events_resource ⇒ Object
-
#initialize ⇒ SpectrumOut
constructor
A new instance of SpectrumOut.
- #parse_rename_rule(rule) ⇒ Object
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#to_utf8(str) ⇒ Object
function to UTF8 encode.
Constructor Details
#initialize ⇒ SpectrumOut
Returns a new instance of SpectrumOut.
18 19 20 21 22 23 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 18 def initialize require 'rest-client' require 'json' require 'cgi' # verify we need --yes, we need it, to_utf8 could not used to create valid url and xml super end |
Instance Method Details
#alarms_resource ⇒ Object
70 71 72 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 70 def alarms_resource RestClient::Resource.new(@alarms_url, :user => @user, :password => @pass, :open_timeout => 5, :timeout => (@interval * 3)) end |
#configure(conf) ⇒ Object
This method is called before starting.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 39 def configure(conf) super # Read configuration for event_rename_rules and create a hash @event_rename_rules = [] conf.elements.select { |element| element.name == 'event_rename_rules' }.each { |element| element.each_pair { |key_varbind, origin_event_keyname| element.has_key?(key_varbind) # to suppress unread configuration warning @event_rename_rules << { key_varbind: key_varbind, origin_event_keyname: origin_event_keyname } $log.info "Added event_rename_rules: #{@event_rename_rules.last}" } } # Read configuration for alarm_rename_rules and create a hash @alarm_rename_rules = [] conf.elements.select { |element| element.name == 'alarm_rename_rules' }.each { |element| element.each_pair { |key_spectrum_alarm, origin_event_keyname| element.has_key?(key_spectrum_alarm) # to suppress unread configuration warning @alarm_rename_rules << { key_spectrum_alarm: key_spectrum_alarm, origin_event_keyname: origin_event_keyname } $log.info "Added alarm_rename_rules: #{@alarm_rename_rules.last}" } } # Setup URL Resource @alarms_url = 'http://' + @endpoint.to_s + '/spectrum/restful/alarms/' @events_url = 'http://' + @endpoint.to_s + '/spectrum/restful/events' def events_resource RestClient::Resource.new(@events_url, :user => @user, :password => @pass, :open_timeout => 5, :timeout => (@interval * 3)) end def alarms_resource RestClient::Resource.new(@alarms_url, :user => @user, :password => @pass, :open_timeout => 5, :timeout => (@interval * 3)) end end |
#emit(tag, es, chain) ⇒ Object
This method is called when an event reaches Fluentd. ‘es’ is a Fluent::EventStream object that includes multiple events. You can use ‘es.each {|time,record| … }’ to retrieve events. ‘chain’ is an object that manages transactions. Call ‘chain.next’ at appropriate points and rollback if it raises an exception.
NOTE! This method is called by Fluentd’s main thread so you should not write slow routine here. It causes Fluentd’s performance degression.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 91 def emit(tag, es, chain) chain.next es.each {|time,record| if (record["event"].has_key?(@alarm_ID_key) && record["event"].has_key?(@spectrum_key) ) ######native spectrum alert ######################## ######PUT alarm to update enriched fields if (record["event"][@spectrum_key] == @spectrum_value) $log.info "The alert is from spectrum" # Create an empty hash alertUpdateHash=Hash.new # Parse thro the array hash that contains name value pairs for hash mapping and add new records to a new hash @alarm_rename_rules.each { |rule| puts rule[:origin_event_keyname] alertUpdateHash[rule[:key_spectrum_alarm]]=record["event"][rule[:origin_event_keyname]] } # construct the alarms PUT uri for update triggerd alarm withe enriched fields @alarms_urlrest = @alarms_url + record["event"][@alarm_ID_key] @attr_count = 0 alertUpdateHash.each do |attr, val| if (val.nil? || val.empty?) next else if (@attr_count == 0) @alarms_urlrest = @alarms_urlrest + "?attr=" + attr + "&val=" + CGI.escape(val.to_s) # @alarms_urlrest = @alarms_urlrest + "?attr=" + attr + "&val=" + to_utf8(val.to_s) @attr_count +=1 else @alarms_urlrest = @alarms_urlrest + "&attr=" + attr + "&val=" + CGI.escape(val.to_s) # @alarms_urlrest = @alarms_urlrest + "&attr=" + attr + "&val=" + to_utf8(val.to_s) @attr_count +=1 end end end $log.info "Rest url for PUT alarms: " + @alarms_urlrest begin # alarmPutRes = alarms_resource.put @alarms_urlrest,:content_type => 'application/json' alarmPutRes = RestClient::Resource.new(@alarms_urlrest,@user,@pass).put(@alarms_urlrest,:content_type => 'application/json') $log.info alarmPutRes end ######3rd party alert ####################### ######Post an event and then trigger an alarm ###### else $log.info "The alert is from 3rd party" # Create an empty hash alertNewHash=Hash.new # Parse thro the array hash that contains name value pairs for hash mapping and add new records to a new hash @event_rename_rules.each { |rule| if(debug) $log.info rule[:key_varbind]+": "+ rule[:origin_event_keyname] end alertNewHash[rule[:key_varbind]]=record["event"][rule[:origin_event_keyname]] } # construct the xml @post_event_xml ="<?xml version=\"1.0\" encoding=\"UTF-8\"?> <rs:event-request throttlesize=\"10\" xmlns:rs=\"http://www.ca.com/spectrum/restful/schema/request\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://www.ca.com/spectrum/restful/schema/request ../../../xsd/Request.xsd\"> <rs:event> <rs:target-models> <rs:model mh= \"#{model_mh}\" /> </rs:target-models> <!-- event ID --> <rs:event-type id=\"#{event_type_id}\"/> <!-- attributes/varbinds -->" alertNewHash.each do |attr, val| if (val.nil? || val.empty?) @post_event_xml += "\n <rs:varbind id=\""+ attr + "\"></rs:varbind>" else @post_event_xml += "\n <rs:varbind id=\""+ attr + "\">"+ CGI.escapeHTML(val) +"</rs:varbind>" end end @post_event_xml += " </rs:event> </rs:event-request>" @triggered_event_id = '' if(debug) $log.info "Rest url for post events: " + @events_url $log.info "xml: " +@post_event_xml end begin # eventPostRes = RestClient::Resource.new(@events_url,@user,@pass).post(@post_event_xml,:content_type => 'application/xml') eventPostRes = events_resource.post @post_event_xml,:content_type => 'application/xml',:accept => 'application/json' $log.info eventPostRes eventPostResBody = JSON.parse(eventPostRes.body) @triggered_event_id = eventPostResBody['ns1.event-response-list']['ns1.event-response']['@id'] # $log.info "event id is: " + @triggered_event_id end end #end of checking alerts is from 3rd party or spectrum else # if don't have @alarm_ID_key and @spectrum_key $log.info "The alert don't have '#{@alarm_ID_key}' and '#{@spectrum_key}' " $log.info record["event"] end } # end of loop for each record end |
#events_resource ⇒ Object
66 67 68 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 66 def events_resource RestClient::Resource.new(@events_url, :user => @user, :password => @pass, :open_timeout => 5, :timeout => (@interval * 3)) end |
#parse_rename_rule(rule) ⇒ Object
32 33 34 35 36 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 32 def parse_rename_rule rule if rule.match /^([^\s]+)\s+(.+)$/ return $~.captures end end |
#shutdown ⇒ Object
This method is called when shutting down.
81 82 83 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 81 def shutdown super end |
#start ⇒ Object
This method is called when starting.
77 78 79 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 77 def start super end |
#to_utf8(str) ⇒ Object
function to UTF8 encode
26 27 28 29 30 |
# File 'lib/fluent/plugin/out_spectrum.rb', line 26 def to_utf8(str) str = str.force_encoding('UTF-8') return str if str.valid_encoding? str.encode("UTF-8", 'binary', invalid: :replace, undef: :replace, replace: '') end |