Class: LogStash::Inputs::CloudWatch_Logs

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/inputs/cloudwatch_logs.rb

Overview

Stream events from CloudWatch Logs streams.

Specify an individual log group, and this plugin will scan all log streams in that group, and pull in any new log events.

Optionally, you may set the ‘log_group_prefix` parameter to true which will scan for all log groups matching the specified prefix and ingest all logs available in all of the matching groups.

Defined Under Namespace

Modules: SinceDB

Instance Method Summary collapse

Instance Method Details

#list_new_streamsObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 72

def list_new_streams()
  if @log_group_prefix
    log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group)
    groups = log_groups.log_groups.map {|n| n.log_group_name}
    while log_groups.next_token
      log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group, next_token: log_groups.next_token)
      groups += log_groups.log_groups.map {|n| n.log_group_name}
    end
  else
    groups = [@log_group]
  end
  objects = []
  for log_group in groups
    objects.concat(list_new_streams_for_log_group(log_group))
  end
  objects
end

#list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback = 0) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 92

def list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback=0)
  params = {
    :log_group_name => log_group,
    :order_by => "LastEventTime",
    :descending => false
  }

  @logger.debug("CloudWatch Logs for log_group #{log_group}")

  if token != nil
    params[:next_token] = token
  end

  begin
    streams = @cloudwatch.describe_log_streams(params)
  rescue Aws::CloudWatchLogs::Errors::ThrottlingException
    @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60)
    sleep(2 ** stepback * 60)
    stepback += 1
    @logger.debug("CloudWatch Logs repeating list_new_streams again with token", :token => token)
    return list_new_streams_for_log_group(log_group, token=token, objects=objects, stepback=stepback)
  end

  objects.push(*streams.log_streams)
  if streams.next_token == nil
    @logger.debug("CloudWatch Logs hit end of tokens for streams")
    objects
  else
    @logger.debug("CloudWatch Logs calling list_new_streams again on token", :token => streams.next_token)
    list_new_streams_for_log_group(log_group, streams.next_token, objects)
  end
end

#process_group(queue) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 149

def process_group(queue)
  objects = list_new_streams

  last_read = sincedb.read
  current_window = DateTime.now.strftime('%Q')

  if last_read < 0
    last_read = 1
  end

  objects.each do |stream|
    if stream.last_ingestion_time && stream.last_ingestion_time > last_read
      process_log_stream(queue, stream, last_read, current_window)
    end
  end

  sincedb.write(current_window)
end

#registerObject



51
52
53
54
55
56
57
58
59
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 51

def register
  require "digest/md5"

  @logger.info("Registering cloudwatch_logs input", :log_group => @log_group)

  Aws::ConfigService::Client.new(aws_options_hash)

  @cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash)
end

#run(queue) ⇒ Object



63
64
65
66
67
68
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 63

def run(queue)
  while !stop?
    process_group(queue)
    Stud.stoppable_sleep(@interval)
  end
end