Class: Fluent::CloudwatchLogsOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_cloudwatch_logs.rb

Constant Summary collapse

MAX_EVENTS_SIZE =
30720

Instance Method Summary collapse

Constructor Details

#initializeCloudwatchLogsOutput

Returns a new instance of CloudwatchLogsOutput.



21
22
23
24
25
# File 'lib/fluent/plugin/out_cloudwatch_logs.rb', line 21

def initialize
  super

  require 'aws-sdk-core'
end

Instance Method Details

#format(tag, time, record) ⇒ Object



37
38
39
# File 'lib/fluent/plugin/out_cloudwatch_logs.rb', line 37

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#startObject



27
28
29
30
31
32
33
34
35
# File 'lib/fluent/plugin/out_cloudwatch_logs.rb', line 27

def start
  super

  options = {}
  options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key
  options[:region] = @region if @region
  @logs = Aws::CloudWatchLogs::Client.new(options)
  @sequence_tokens = {}
end

#write(chunk) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_cloudwatch_logs.rb', line 41

def write(chunk)
  events = []
  chunk.enum_for(:msgpack_each).chunk {|tag, time, record|
    tag
  }.each {|tag, rs|
    group_name = @use_tag_as_group ? tag : @log_group_name

    unless log_group_exists?(group_name)
      if @auto_create_stream
        create_log_group(group_name)
      else
        log.warn "Log group '#{group_name}' dose not exists"
        next
      end
    end

    unless log_stream_exists?(group_name, @log_stream_name)
      if @auto_create_stream
        create_log_stream(group_name, @log_stream_name)
      else
        log.warn "Log stream '#{@log_stream_name}' dose not exists"
        next
      end
    end

    rs.each do |t, time, record|
      time_ms = time * 1000

      if @message_keys
        message = @message_keys.split(',').map {|k| record[k].to_s }.join(' ')
      else
        message = record.to_json
      end

      message.force_encoding('ASCII-8BIT')

      if @max_message_length
        message = message.slice(0, @max_message_length)
      end

      events << {timestamp: time_ms, message: message}
    end
    # The log events in the batch must be in chronological ordered by their timestamp.
    # http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
    events = events.sort_by {|e| e[:timestamp] }
    put_events_by_chunk(group_name, events)
  }
end