Class: LogStash::Outputs::Qingstor
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Qingstor
show all
- Defined in:
- lib/logstash/outputs/qingstor/uploader.rb,
lib/logstash/outputs/qingstor/temporary_file.rb,
lib/logstash/outputs/qingstor/file_repository.rb,
lib/logstash/outputs/qingstor/rotation_policy.rb,
lib/logstash/outputs/qingstor/multipart_uploader.rb,
lib/logstash/outputs/qingstor/qingstor_validator.rb,
lib/logstash/outputs/qingstor/temporary_file_factory.rb,
lib/logstash/outputs/qingstor.rb
Defined Under Namespace
Modules: QingstorValidator
Classes: FileRepository, RotationPolicy, TemporaryFile, TemporaryFileFactory, Uploader
Constant Summary
collapse
- PERIODIC_CHECK_INTERVAL_IN_SECONDS =
15
- CRASH_RECOVERY_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(
:min_threads => 1,
:max_threads => 2,
:fallback_policy => :caller_runs
)
Instance Method Summary
collapse
Instance Method Details
#clean_temporary_file(file) ⇒ Object
224
225
226
227
|
# File 'lib/logstash/outputs/qingstor.rb', line 224
def clean_temporary_file(file)
@logger.info('Callback: removing temporary file', :file => file.path)
file.delete!
end
|
#close ⇒ Object
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/logstash/outputs/qingstor.rb', line 196
def close
stop_periodic_check if @rotation.needs_periodic?
@logger.info('uploading current workspace before closing')
@file_repository.each_files do |file|
upload_file(file) if file.size > 0
end
@file_repository.shutdown
@uploader.stop
@crash_uploader.stop if @restore
end
|
#directory_valid?(path) ⇒ Boolean
247
248
249
250
251
252
|
# File 'lib/logstash/outputs/qingstor.rb', line 247
def directory_valid?(path)
FileUtils.mkdir_p(path) unless Dir.exist?(path)
::File.writable?(path)
rescue
false
end
|
#getbucket ⇒ Object
189
190
191
192
193
194
|
# File 'lib/logstash/outputs/qingstor.rb', line 189
def getbucket
@qs_config = QingStor::SDK::Config.init @access_key_id, @secret_access_key
@qs_config.update(:host => @host, :port => @port) unless @host.nil?
@qs_service = QingStor::SDK::Service.new @qs_config
@qs_service.bucket @bucket, @region
end
|
#log_print_config ⇒ Object
131
132
133
134
135
136
137
138
|
# File 'lib/logstash/outputs/qingstor.rb', line 131
def log_print_config
@logger.info('Run at setting: ', :prefix => @prefix,
:tmpdir => @tmpdir,
:rotation => @rotation.to_s,
:tags => @tags,
:encoding => @encoding,
:restore => @restore)
end
|
#multi_receive_encoded(events_and_encoded) ⇒ Object
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/logstash/outputs/qingstor.rb', line 140
def multi_receive_encoded(events_and_encoded)
prefix_written_to = Set.new
events_and_encoded.each do |event, encoded|
prefix_key = event.sprintf(@prefix)
prefix_written_to << prefix_key
begin
@file_repository.get_file(prefix_key) do |f|
content = encoded.strip + "\n"
f.write(content)
end
rescue Errno::ENOSPC => e
@logger.error('QingStor: Nospace left in temporary directory',
:tmpdir => @tmpdir)
raise e
end
end
rotate_if_needed(prefix_written_to)
end
|
#register ⇒ Object
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/logstash/outputs/qingstor.rb', line 106
def register
QingstorValidator.prefix_valid?(@prefix) unless @prefix.nil?
unless directory_valid?(@tmpdir)
raise LogStash::ConfigurationError,
"Logstash must have the permissions to write to: #{@tmpdir}"
end
@file_repository = FileRepository.new(@tags, @encoding, @tmpdir)
@rotation = RotationPolicy.new(@rotation_strategy, @file_size, @file_time)
executor = Concurrent::ThreadPoolExecutor.new(
:min_threads => 1,
:max_threads => @upload_workers_count,
:max_queue => @upload_queue_size,
:fallback_policy => :caller_runs
)
@qs_bucket = getbucket
QingstorValidator.bucket_valid?(@qs_bucket)
@uploader = Uploader.new(@qs_bucket, @logger, executor)
log_print_config
start_periodic_check if @rotation.needs_periodic?
restore_from_crash if @restore
end
|
#restore_from_crash ⇒ Object
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
# File 'lib/logstash/outputs/qingstor.rb', line 254
def restore_from_crash
@crash_uploader = Uploader.new(@qs_bucket, @logger,
CRASH_RECOVERY_THREADPOOL)
temp_folder_path = Pathname.new(@tmpdir)
Dir.glob(::File.join(@tmpdir, '**/*'))
.select { |file| ::File.file?(file) }
.each do |file|
temp_file = TemporaryFile.create_from_existing_file(file,
temp_folder_path)
if temp_file.size > 0
temp_file.key = 'Restored/' + Time.new.strftime('%Y-%m-%d/') + temp_file.key
@logger.info('Recoving from crash and uploading',
:file => temp_file.key)
@crash_uploader.upload_async(
temp_file,
:on_complete => method(:clean_temporary_file),
:upload_options => upload_options
)
elsif temp_file.size == 0
@logger.info('Recoving from crash, delete empty files',
:file => temp_file.path)
temp_file.delete!
end
end
end
|
#rotate_if_needed(prefixs) ⇒ Object
def multi_receive_encoded
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/logstash/outputs/qingstor.rb', line 164
def rotate_if_needed(prefixs)
prefixs.each do |prefix|
@file_repository.get_factory(prefix) do |factory|
tmp_file = factory.current
if @rotation.rotate?(tmp_file)
@logger.debug('Rotate file',
:strategy => tmp_file.key,
:path => tmp_file.path)
upload_file(tmp_file)
factory.rotate!
end
end
end
end
|
#start_periodic_check ⇒ Object
229
230
231
232
233
234
235
236
237
238
239
240
241
|
# File 'lib/logstash/outputs/qingstor.rb', line 229
def start_periodic_check
@logger.info('Start periodic rotation check')
@periodic_check = Concurrent::TimerTask.new(
:execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS
) do
@logger.debug('Periodic check for stale files')
rotate_if_needed(@file_repository.keys)
end
@periodic_check.execute
end
|
#stop_periodic_check ⇒ Object
243
244
245
|
# File 'lib/logstash/outputs/qingstor.rb', line 243
def stop_periodic_check
@periodic_check.shutdown
end
|
#upload_file(file) ⇒ Object
180
181
182
183
184
185
186
187
|
# File 'lib/logstash/outputs/qingstor.rb', line 180
def upload_file(file)
@logger.debug('Add file to uploading queue', :key => file.key)
file.close
@logger.debug('upload options', :upload_options => upload_options)
@uploader.upload_async(file,
:on_complete => method(:clean_temporary_file),
:upload_options => upload_options)
end
|
#upload_options ⇒ Object
211
212
213
214
215
216
217
218
219
220
221
222
|
# File 'lib/logstash/outputs/qingstor.rb', line 211
def upload_options
options = {
:content_encoding => @encoding == 'gzip' ? 'gzip' : nil
}
if @server_side_encryption_algorithm == 'AES256' && !@customer_key.nil?
options[:server_side_encryption_algorithm] = @server_side_encryption_algorithm
options[:customer_key] = @customer_key
end
options
end
|