Class: LogStash::Outputs::Qingstor

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/qingstor/uploader.rb,
lib/logstash/outputs/qingstor/temporary_file.rb,
lib/logstash/outputs/qingstor/file_repository.rb,
lib/logstash/outputs/qingstor/rotation_policy.rb,
lib/logstash/outputs/qingstor/multipart_uploader.rb,
lib/logstash/outputs/qingstor/qingstor_validator.rb,
lib/logstash/outputs/qingstor/temporary_file_factory.rb,
lib/logstash/outputs/qingstor.rb

Defined Under Namespace

Modules: QingstorValidator Classes: FileRepository, RotationPolicy, TemporaryFile, TemporaryFileFactory, Uploader

Constant Summary collapse

PERIODIC_CHECK_INTERVAL_IN_SECONDS =
15
CRASH_RECOVERY_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(
  :min_threads => 1,
  :max_threads => 2,
  :fallback_policy => :caller_runs
)

Instance Method Summary collapse

Instance Method Details

#clean_temporary_file(file) ⇒ Object



224
225
226
227
# File 'lib/logstash/outputs/qingstor.rb', line 224

def clean_temporary_file(file)
  @logger.info('Callback: removing temporary file', :file => file.path)
  file.delete!
end

#closeObject



196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/logstash/outputs/qingstor.rb', line 196

def close
  stop_periodic_check if @rotation.needs_periodic?

  @logger.info('uploading current workspace before closing')
  @file_repository.each_files do |file|
    upload_file(file) if file.size > 0
  end

  @file_repository.shutdown

  @uploader.stop

  @crash_uploader.stop if @restore
end

#directory_valid?(path) ⇒ Boolean



247
248
249
250
251
252
# File 'lib/logstash/outputs/qingstor.rb', line 247

def directory_valid?(path)
  FileUtils.mkdir_p(path) unless Dir.exist?(path)
  ::File.writable?(path)
rescue
  false
end

#getbucketObject



189
190
191
192
193
194
# File 'lib/logstash/outputs/qingstor.rb', line 189

def getbucket
  @qs_config = QingStor::SDK::Config.init @access_key_id, @secret_access_key
  @qs_config.update(:host => @host, :port => @port) unless @host.nil?
  @qs_service = QingStor::SDK::Service.new @qs_config
  @qs_service.bucket @bucket, @region
end

#log_print_configObject

def register



131
132
133
134
135
136
137
138
# File 'lib/logstash/outputs/qingstor.rb', line 131

def log_print_config
  @logger.info('Run at setting: ', :prefix => @prefix,
                                   :tmpdir => @tmpdir,
                                   :rotation => @rotation.to_s,
                                   :tags => @tags,
                                   :encoding => @encoding,
                                   :restore => @restore)
end

#multi_receive_encoded(events_and_encoded) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/logstash/outputs/qingstor.rb', line 140

def multi_receive_encoded(events_and_encoded)
  prefix_written_to = Set.new

  events_and_encoded.each do |event, encoded|
    prefix_key = event.sprintf(@prefix)
    prefix_written_to << prefix_key

    begin
      @file_repository.get_file(prefix_key) do |f|
        content = encoded.strip + "\n"
        f.write(content)
      end
    rescue Errno::ENOSPC => e
      @logger.error('QingStor: Nospace left in temporary directory',
                    :tmpdir => @tmpdir)
      raise e
    end
  end # end of each method

  # check the file after file writing
  # Groups IO calls to optimize fstat checks
  rotate_if_needed(prefix_written_to)
end

#registerObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/logstash/outputs/qingstor.rb', line 106

def register
  QingstorValidator.prefix_valid?(@prefix) unless @prefix.nil?
  unless directory_valid?(@tmpdir)
    raise LogStash::ConfigurationError,
          "Logstash must have the permissions to write to: #{@tmpdir}"
  end

  @file_repository = FileRepository.new(@tags, @encoding, @tmpdir)
  @rotation = RotationPolicy.new(@rotation_strategy, @file_size, @file_time)
  executor = Concurrent::ThreadPoolExecutor.new(
    :min_threads => 1,
    :max_threads => @upload_workers_count,
    :max_queue => @upload_queue_size,
    :fallback_policy => :caller_runs
  )

  @qs_bucket = getbucket
  QingstorValidator.bucket_valid?(@qs_bucket)
  @uploader = Uploader.new(@qs_bucket, @logger, executor)

  log_print_config
  start_periodic_check if @rotation.needs_periodic?
  restore_from_crash if @restore
end

#restore_from_crashObject



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/logstash/outputs/qingstor.rb', line 254

def restore_from_crash
  @crash_uploader = Uploader.new(@qs_bucket, @logger,
                                 CRASH_RECOVERY_THREADPOOL)

  temp_folder_path = Pathname.new(@tmpdir)
  Dir.glob(::File.join(@tmpdir, '**/*'))
     .select { |file| ::File.file?(file) }
     .each do |file|
    temp_file = TemporaryFile.create_from_existing_file(file,
                                                        temp_folder_path)
    # Now multipart uploader supports file size up to 500GB
    if temp_file.size > 0
      temp_file.key = 'Restored/' + Time.new.strftime('%Y-%m-%d/') + temp_file.key
      @logger.info('Recoving from crash and uploading',
                  :file => temp_file.key)
      @crash_uploader.upload_async(
        temp_file,
        :on_complete => method(:clean_temporary_file),
        :upload_options => upload_options
      )
    elsif temp_file.size == 0
      @logger.info('Recoving from crash, delete empty files',
                  :file => temp_file.path)
      temp_file.delete!
    end
  end
end

#rotate_if_needed(prefixs) ⇒ Object

def multi_receive_encoded



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/logstash/outputs/qingstor.rb', line 164

def rotate_if_needed(prefixs)
  prefixs.each do |prefix|
    @file_repository.get_factory(prefix) do |factory|
      tmp_file = factory.current

      if @rotation.rotate?(tmp_file)
        @logger.debug('Rotate file',
                      :strategy => tmp_file.key,
                      :path => tmp_file.path)
        upload_file(tmp_file)
        factory.rotate!
      end
    end
  end
end

#start_periodic_checkObject



229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/logstash/outputs/qingstor.rb', line 229

def start_periodic_check
  @logger.info('Start periodic rotation check')

  @periodic_check = Concurrent::TimerTask.new(
    :execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS
  ) do
    @logger.debug('Periodic check for stale files')

    rotate_if_needed(@file_repository.keys)
  end

  @periodic_check.execute
end

#stop_periodic_checkObject



243
244
245
# File 'lib/logstash/outputs/qingstor.rb', line 243

def stop_periodic_check
  @periodic_check.shutdown
end

#upload_file(file) ⇒ Object

def rotate_if_needed



180
181
182
183
184
185
186
187
# File 'lib/logstash/outputs/qingstor.rb', line 180

def upload_file(file)
  @logger.debug('Add file to uploading queue', :key => file.key)
  file.close
  @logger.debug('upload options', :upload_options => upload_options)
  @uploader.upload_async(file,
                         :on_complete => method(:clean_temporary_file),
                         :upload_options => upload_options)
end

#upload_optionsObject



211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/logstash/outputs/qingstor.rb', line 211

def upload_options
  options = {
    :content_encoding => @encoding == 'gzip' ? 'gzip' : nil
  }

  if @server_side_encryption_algorithm == 'AES256' && !@customer_key.nil?
    options[:server_side_encryption_algorithm] = @server_side_encryption_algorithm
    options[:customer_key] = @customer_key
  end

  options
end