Class: Fluent::CloudWatchTransOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_cloudwatch_transform.rb

Instance Method Summary collapse

Constructor Details

#initializeCloudWatchTransOutput

Returns a new instance of CloudWatchTransOutput.



43
44
45
46
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 43

def initialize
  require 'highwatermark'
  super
end

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 19

def configure(conf)
  super
  # Read configuration for tag_infos and create a hash
  @tag_infos = Hash.new
  conf.elements.select { |element| element.name == 'tag_infos' }.each { |element|
    element.each_pair { |info_name, position_in_tag|
      element.has_key?(info_name) # to suppress unread configuration warning
      @tag_infos[info_name] = position_in_tag.to_i
      $log.info "Added tag_infos: #{info_name}=>#{@tag_infos[info_name]}"
    }
  }

  # configure for highwatermark
  @highwatermark_parameters={
    "state_tag" => @state_tag,     
    "state_type" => @state_type,
    "state_file" => @state_file,
    "redis_host" => @redis_host,
    "redis_port" => @redis_port      
  }
  $log.info "highwatermark_parameters: #{@highwatermark_parameters}"

end

#emit(tag, es, chain) ⇒ Object

This method is called when an event reaches Fluentd. ‘es’ is a Fluent::EventStream object that includes multiple events. You can use ‘es.each {|time,record| … }’ to retrieve events. ‘chain’ is an object that manages transactions. Call ‘chain.next’ at appropriate points and rollback if it raises an exception.

NOTE! This method is called by Fluentd’s main thread so you should not write slow routine here. It causes Fluentd’s performance degression.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 67

def emit(tag, es, chain)
  tag_parts = tag.scan( /([^".]+)|"([^"]+)"/ ).flatten.compact
  # split the tags with .
  # ingnore the . within ""

  chain.next
  es.each {|time,record|

		newhash = Hash.new
		# though there is just one key-value pair in cloudwatch alert record, we use a loop to add context for it.
		record.each_pair do |singlekey, singlevalue|
				newhash[@name_for_origin_key] = singlekey
        newhash[@name_for_origin_value] = singlevalue.to_s
        newhash["raw"] ={singlekey => singlevalue}
    end
    # add more information for the cloudwatch alert
    # fixed info
    timestamp = Engine.now # Should be receive_time_input
    newhash["receive_time_input"]=timestamp.to_s
    newhash["event_type"] = @out_tag

    @tag_infos.each do |info_name, position_in_tag|
      newhash[info_name] = tag_parts[position_in_tag]
    end

      

    if @highwatermark.last_records(@state_tag)
      last_hwm = @highwatermark.last_records(@state_tag)
      $log.info "got hwm form state file: #{last_hwm.to_i}"
    else
      $log.info "no hwm yet"
    end

    @highwatermark.update_records(timestamp.to_s,@state_tag)

    #log the transformed message and emit it
    $log.info "Tranformed message  #{newhash}"
		Fluent::Engine.emit @out_tag, time.to_i, newhash
  }
end

#shutdownObject

This method is called when shutting down.



56
57
58
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 56

def shutdown
  super
end

#startObject

This method is called when starting.



49
50
51
52
53
# File 'lib/fluent/plugin/out_cloudwatch_transform.rb', line 49

def start
  super
  @highwatermark = Highwatermark::HighWaterMark.new(@highwatermark_parameters)

end