Class: LogStash::Outputs::CloudWatchLogs
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::CloudWatchLogs
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/outputs/cloudwatchlogs.rb
Overview
This output lets you send log data to AWS CloudWatch Logs service
Defined Under Namespace
Classes: Buffer
Constant Summary collapse
- LOG_GROUP_NAME =
Constants
"log_group_name"- LOG_STREAM_NAME =
"log_stream_name"- SEQUENCE_TOKEN =
"sequence_token"- TIMESTAMP =
"@timestamp"- MESSAGE =
"message"- PER_EVENT_OVERHEAD =
26- MAX_BATCH_SIZE =
1024 * 1024
- MAX_BATCH_COUNT =
10000- MAX_DISTANCE_BETWEEN_EVENTS =
86400 * 1000
- MIN_DELAY =
0.2
- MIN_BUFFER_DURATION =
5000- MAX_BACKOFF_IN_SECOND =
Backoff up to 64 seconds upon failure
64
Instance Attribute Summary collapse
-
#buffer ⇒ Object
readonly
Only accessed by tests.
-
#cwl ⇒ Object
Returns the value of attribute cwl.
-
#last_flush ⇒ Object
Returns the value of attribute last_flush.
-
#sequence_token ⇒ Object
Returns the value of attribute sequence_token.
Instance Method Summary collapse
Instance Attribute Details
#buffer ⇒ Object (readonly)
Only accessed by tests
75 76 77 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 75 def buffer @buffer end |
#cwl ⇒ Object
Returns the value of attribute cwl.
72 73 74 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 72 def cwl @cwl end |
#last_flush ⇒ Object
Returns the value of attribute last_flush.
72 73 74 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 72 def last_flush @last_flush end |
#sequence_token ⇒ Object
Returns the value of attribute sequence_token.
72 73 74 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 72 def sequence_token @sequence_token end |
Instance Method Details
#flush(events) ⇒ Object
135 136 137 138 139 140 141 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 135 def flush(events) return if events.nil? or events.empty? log_event_batches = prepare_log_events(events) log_event_batches.each do |log_events| put_log_events(log_events) end end |
#receive(event) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 108 def receive(event) return unless output?(event) if event == LogStash::SHUTDOWN @buffer.close @publisher.join @logger.info("CloudWatch Logs output plugin shutdown.") finished return end return if invalid?(event) @buffer.enq( {:timestamp => event..time.to_f*1000, :message => event[MESSAGE] }) end |
#register ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 78 def register require "aws-sdk" @cwl = Aws::CloudWatchLogs::Client.new() if @batch_count > MAX_BATCH_COUNT @logger.warn(":batch_count exceeds the max number of log events. Use #{MAX_BATCH_COUNT} instead.") @batch_count = MAX_BATCH_COUNT end if @batch_size > MAX_BATCH_SIZE @logger.warn(":batch_size exceeds the max size of log events. Use #{MAX_BATCH_SIZE} instead.") @batch_size = MAX_BATCH_SIZE end if @buffer_duration < MIN_BUFFER_DURATION @logger.warn(":buffer_duration is smaller than the min value. Use #{MIN_BUFFER_DURATION} instead.") @buffer_duration = MIN_BUFFER_DURATION end @sequence_token = nil @last_flush = Time.now.to_f @buffer = Buffer.new( max_batch_count: batch_count, max_batch_size: batch_size, buffer_duration: @buffer_duration, out_queue_size: @queue_size, logger: @logger, size_of_item_proc: Proc.new {|event| event[:message].bytesize + PER_EVENT_OVERHEAD}) @publisher = Thread.new do @buffer.deq do |batch| flush(batch) end end end |
#teardown ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 126 def teardown @logger.info("Going to clean up resources") @buffer.close @publisher.join @cwl = nil finished end |