Class: LogStash::Outputs::CloudWatchLogs

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/outputs/cloudwatchlogs.rb

Overview

This output lets you send log data to AWS CloudWatch Logs service

Defined Under Namespace

Classes: Buffer

Constant Summary collapse

LOG_GROUP_NAME =

Constants

"log_group_name"
LOG_STREAM_NAME =
"log_stream_name"
SEQUENCE_TOKEN =
"sequence_token"
TIMESTAMP =
"@timestamp"
MESSAGE =
"message"
PER_EVENT_OVERHEAD =
26
MAX_BATCH_SIZE =
1024 * 1024
MAX_BATCH_COUNT =
10000
MAX_DISTANCE_BETWEEN_EVENTS =
86400 * 1000
MIN_DELAY =
0.2
MIN_BUFFER_DURATION =
5000
MAX_BACKOFF_IN_SECOND =

Backoff up to 64 seconds upon failure

64

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#bufferObject (readonly)

Only accessed by tests



75
76
77
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 75

def buffer
  @buffer
end

#cwlObject

Returns the value of attribute cwl.



72
73
74
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 72

def cwl
  @cwl
end

#last_flushObject

Returns the value of attribute last_flush.



72
73
74
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 72

def last_flush
  @last_flush
end

#sequence_tokenObject

Returns the value of attribute sequence_token.



72
73
74
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 72

def sequence_token
  @sequence_token
end

Instance Method Details

#flush(events) ⇒ Object



135
136
137
138
139
140
141
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 135

def flush(events)
  return if events.nil? or events.empty?
  log_event_batches = prepare_log_events(events)
  log_event_batches.each do |log_events|
    put_log_events(log_events)
  end
end

#receive(event) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 108

def receive(event)
  return unless output?(event)

  if event == LogStash::SHUTDOWN
    @buffer.close
    @publisher.join
    @logger.info("CloudWatch Logs output plugin shutdown.")
    finished
    return
  end
  return if invalid?(event)

  @buffer.enq(
    {:timestamp => event.timestamp.time.to_f*1000,
     :message => event[MESSAGE] })
end

#registerObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 78

def register
  require "aws-sdk"
  @cwl = Aws::CloudWatchLogs::Client.new(aws_options_hash)

  if @batch_count > MAX_BATCH_COUNT
    @logger.warn(":batch_count exceeds the max number of log events. Use #{MAX_BATCH_COUNT} instead.")
    @batch_count = MAX_BATCH_COUNT
  end
  if @batch_size > MAX_BATCH_SIZE
    @logger.warn(":batch_size exceeds the max size of log events. Use #{MAX_BATCH_SIZE} instead.")
    @batch_size = MAX_BATCH_SIZE
  end
  if @buffer_duration < MIN_BUFFER_DURATION
    @logger.warn(":buffer_duration is smaller than the min value. Use #{MIN_BUFFER_DURATION} instead.")
    @buffer_duration = MIN_BUFFER_DURATION
  end
  @sequence_token = nil
  @last_flush = Time.now.to_f
  @buffer = Buffer.new(
    max_batch_count: batch_count, max_batch_size: batch_size,
    buffer_duration: @buffer_duration, out_queue_size: @queue_size, logger: @logger,
    size_of_item_proc: Proc.new {|event| event[:message].bytesize + PER_EVENT_OVERHEAD})
  @publisher = Thread.new do
    @buffer.deq do |batch|
      flush(batch)
    end
  end
end

#teardownObject



126
127
128
129
130
131
132
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 126

def teardown
  @logger.info("Going to clean up resources")
  @buffer.close
  @publisher.join
  @cwl = nil
  finished
end