Class: LogStash::Inputs::CloudWatch_Logs
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::CloudWatch_Logs
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/inputs/cloudwatch_logs.rb
Overview
Stream events from CloudWatch Logs streams.
Specify an individual log group, and this plugin will scan all log streams in that group, and pull in any new log events.
Optionally, you may set the ‘log_group_prefix` parameter to true which will scan for all log groups matching the specified prefix and ingest all logs available in all of the matching groups.
Defined Under Namespace
Modules: SinceDB
Instance Method Summary collapse
- #list_new_streams ⇒ Object
- #list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback = 0) ⇒ Object
- #process_group(queue) ⇒ Object
- #register ⇒ Object
- #run(queue) ⇒ Object
Instance Method Details
#list_new_streams ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 72 def list_new_streams() if @log_group_prefix log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group) groups = log_groups.log_groups.map {|n| n.log_group_name} while log_groups.next_token log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group, next_token: log_groups.next_token) groups += log_groups.log_groups.map {|n| n.log_group_name} end else groups = [@log_group] end objects = [] for log_group in groups objects.concat(list_new_streams_for_log_group(log_group)) end objects end |
#list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback = 0) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 92 def list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback=0) params = { :log_group_name => log_group, :order_by => "LastEventTime", :descending => false } @logger.debug("CloudWatch Logs for log_group #{log_group}") if token != nil params[:next_token] = token end begin streams = @cloudwatch.describe_log_streams(params) rescue Aws::CloudWatchLogs::Errors::ThrottlingException @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60) sleep(2 ** stepback * 60) stepback += 1 @logger.debug("CloudWatch Logs repeating list_new_streams again with token", :token => token) return list_new_streams_for_log_group(log_group, token=token, objects=objects, stepback=stepback) end objects.push(*streams.log_streams) if streams.next_token == nil @logger.debug("CloudWatch Logs hit end of tokens for streams") objects else @logger.debug("CloudWatch Logs calling list_new_streams again on token", :token => streams.next_token) list_new_streams_for_log_group(log_group, streams.next_token, objects) end end |
#process_group(queue) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 149 def process_group(queue) objects = list_new_streams last_read = sincedb.read current_window = DateTime.now.strftime('%Q') if last_read < 0 last_read = 1 end objects.each do |stream| if stream.last_ingestion_time && stream.last_ingestion_time > last_read process_log_stream(queue, stream, last_read, current_window) end end sincedb.write(current_window) end |
#register ⇒ Object
51 52 53 54 55 56 57 58 59 |
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 51 def register require "digest/md5" @logger.info("Registering cloudwatch_logs input", :log_group => @log_group) Aws::ConfigService::Client.new() @cloudwatch = Aws::CloudWatchLogs::Client.new() end |
#run(queue) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/logstash/inputs/cloudwatch_logs.rb', line 63 def run(queue) while !stop? process_group(queue) Stud.stoppable_sleep(@interval) end end |