Class: LogStash::Outputs::OSS

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/oss.rb,
lib/logstash/outputs/oss/version.rb,
lib/logstash/outputs/oss/gzip_file.rb,
lib/logstash/outputs/oss/file_manager.rb,
lib/logstash/outputs/oss/file_uploader.rb,
lib/logstash/outputs/oss/file_generator.rb,
lib/logstash/outputs/oss/temporary_file.rb,
lib/logstash/outputs/oss/rotations/hybrid_rotation.rb,
lib/logstash/outputs/oss/rotations/size_based_rotation.rb,
lib/logstash/outputs/oss/rotations/time_based_rotation.rb

Overview

Logstash OSS Output Plugin

Usage: In order to write output data to OSS, you should add configurations like below to logstash output

oss {
  "endpoint" => "OSS endpoint to connect to"              (required)
  "bucket" => "Your bucket name"                          (required)
  "access_key_id" => "Your access key id"                 (required)
  "access_key_secret" => "Your access secret key"         (required)
  "prefix" => "logstash/%{index"                         (optional, default = "")
  "recover" => true                                       (optional, default = true)
  "rotation_strategy" => "size_and_time"                  (optional, default = "size_and_time")
  "time_rotate" => 15                                     (optional, default = 15) - Minutes
  "size_rotate" => 31457280                               (optional, default = 31457280) - Bytes
  "encoding" => "none"                                    (optional, default = "none")
  "additional_oss_settings" => {
    "max_connections_to_oss" => 1024                      (optional, default = 1024)
    "secure_connection_enabled" => false                  (optional, default = false)
  }
}

}

Defined Under Namespace

Classes: FileGenerator, FileManager, FileUploader, GzipFile, HybridRotation, SizeBasedRotation, TemporaryFile, TimeBasedRotation, Version

Constant Summary collapse

ROTATE_CHECK_INTERVAL_IN_SECONDS =
15
MAX_CONNECTIONS_TO_OSS_KEY =
"max_connections_to_oss"
SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY =
"server_side_encryption_algorithm"
SECURE_CONNECTION_ENABLED_KEY =
"secure_connection_enabled"

Instance Method Summary collapse

Instance Method Details

#closeObject



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/logstash/outputs/oss.rb', line 166

def close
  @logger.info("Logstash OSS Output Plugin is shutting down...")

  # stop rotate check
  stop_rotate_check if @rotation.needs_periodic_check?

  prefixes = @file_manager.prefixes
  prefixes.each do |prefix|
    @file_manager.get_file_generator(prefix) do |generator|
      file = generator.current_file
      file.close
      if file.size > 0
        # upload async
        @file_uploader.upload_async(file, :on_complete => method(:clean_temporary_file))
      end
    end
  end

  @file_manager.close

  # stop file uploader
  @file_uploader.close

  # stop oss client
  @oss.shutdown
end

#multi_receive_encoded(events_and_encoded) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/logstash/outputs/oss.rb', line 150

def multi_receive_encoded(events_and_encoded)
  prefixes = Set.new
  events_and_encoded.each do |event, encoded|
    prefix = event.sprintf(@prefix)
    prefixes << prefix

    begin
      @file_manager.get_file_generator(prefix) { |generator| generator.current_file.write(encoded) }
    rescue Errno::ENOSPC => e
      @logger.error("Logstash OSS Output Plugin: No space left in temporary directory", :temporary_directory => @temporary_directory)
      raise e
    end
  end
  rotate(prefixes)
end

#registerObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/logstash/outputs/oss.rb', line 104

def register
  # TODO: check if prefix is valid

  # check if temporary_directory is writable
  begin
    FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory)
    ::File.writable?(@temporary_directory)
  rescue
    raise LogStash::ConfigurationError, "Logstash OSS Output Plugin can not write data to " + @temporary_directory
  end

  # check rotation configuration
  if @size_rotate.nil? and @time_rotate.nil? || @size_rotate <= 0 && @time_rotate <= 0
    raise LogStash::ConfigurationError, "Logstash OSS Output Plugin must have at least one of time_file or size_file set to a value greater than 0"
  end

  if @upload_workers_count <= 0 || @upload_queue_size <= 0
    raise LogStash::ConfigurationError,  "Logstash OSS Output Plugin must have both upload_workers_count and upload_queue_size are positive"
  end

  # create upload thread pool
  executor = Concurrent::ThreadPoolExecutor.new({ :min_threads => 1,
                                                  :max_threads => @upload_workers_count,
                                                  :max_queue => @upload_queue_size,
                                                  :fallback_policy => :caller_runs })

  # get file rotation strategy
  @rotation = rotation

  # initialize oss client
  @oss = initialize_oss_client

  # initialize file uploader
  @file_uploader = FileUploader.new(@oss, @bucket, @additional_oss_settings, @logger, executor)

  # initialize file manager
  @file_manager = FileManager.new(@logger, @encoding, @temporary_directory)

  # recover from crash
  recover_from_crash if @recover

  # start rotate check
  start_rotate_check if @rotation.needs_periodic_check?
end