Class: Fluent::CloudWatchTransOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_cloudwatch_transform.rb

Instance Method Summary collapse

Constructor Details

#initializeCloudWatchTransOutput

Returns a new instance of CloudWatchTransOutput.



17
18
19
20
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 17

def initialize
  require 'highwatermark'
  super
end

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting.



13
14
15
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 13

def configure(conf)
  super
end

#emit(tag, es, chain) ⇒ Object

This method is called when an event reaches Fluentd. ‘es’ is a Fluent::EventStream object that includes multiple events. You can use ‘es.each {|time,record| … }’ to retrieve events. ‘chain’ is an object that manages transactions. Call ‘chain.next’ at appropriate points and rollback if it raises an exception.

NOTE! This method is called by Fluentd’s main thread so you should not write slow routine here. It causes Fluentd’s performance degression.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 41

def emit(tag, es, chain)
  #tag_parts = tag.split('.')
  tag_parts = tag.scan( /([^".]+)|"([^"]+)"/ ).flatten.compact
  # the prefix of tag should be like alert.cloudwatch.raw.***
  # so start from tag_parts[3]
  regionAZ = tag_parts[3]
  application_name = tag_parts[4]
  runbook_url = tag_parts[5]
  chain.next
  es.each {|time,record|

		newhash = Hash.new
		# though there is just one key-value pair in cloudwatch alert record, we use a loop to add context for it.
		record.each_pair do |singlekey, singlevalue|
				newhash["event_name"] = singlekey
        newhash["value"] = singlevalue.to_s
        newhash["raw"] ={singlekey => singlevalue}
    end
    # add more information for the cloudwatch alert
    timestamp = Engine.now # Should be receive_time_input
    newhash["receive_time_input"]=timestamp.to_s
    newhash["application_name"] = application_name
    newhash["intermediary_source"] = regionAZ
    newhash["runbook"] =  runbook_url
    newhash["event_type"] = "alert.cloudwatch"

    if @highwatermark.last_records(@tag)
      last_hwm = @highwatermark.last_records(@tag)
      $log.info "got hwm form state file: #{last_hwm.to_i}"
    else
      $log.info "no hwm yet"
    end

    @highwatermark.update_records(timestamp.to_s,@tag)

    #log the transformed message and emit it
    $log.info "Tranformed message  #{newhash}"
		Fluent::Engine.emit @tag, time.to_i, newhash
  }
end

#shutdownObject

This method is called when shutting down.



30
31
32
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 30

def shutdown
  super
end

#startObject

This method is called when starting.



23
24
25
26
27
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 23

def start
  super
  @highwatermark = Highwatermark::HighWaterMark.new(@state_file, @state_type)

end