Class: LogStash::Outputs::Application_insights::Blob
- Inherits:
-
Object
- Object
- LogStash::Outputs::Application_insights::Blob
- Defined in:
- lib/logstash/outputs/application_insights/blob.rb
Direct Known Subclasses
Constant Summary collapse
- CREATE_EXIST_ERRORS =
{ :container => [ :create_container, :container_exist ], :table => [ :create_table, :table_exist ] }
- @@closing =
false
Instance Attribute Summary collapse
-
#blob_name ⇒ Object
readonly
Returns the value of attribute blob_name.
-
#container_name ⇒ Object
readonly
Returns the value of attribute container_name.
-
#instrumentation_key ⇒ Object
readonly
Returns the value of attribute instrumentation_key.
-
#io_queue ⇒ Object
readonly
Returns the value of attribute io_queue.
-
#last_io_exception ⇒ Object
readonly
Returns the value of attribute last_io_exception.
-
#oldest_event_time ⇒ Object
readonly
Returns the value of attribute oldest_event_time.
-
#storage_account_name ⇒ Object
readonly
Returns the value of attribute storage_account_name.
-
#table_id ⇒ Object
readonly
Returns the value of attribute table_id.
-
#uploaded_bytesize ⇒ Object
readonly
Returns the value of attribute uploaded_bytesize.
-
#uploaded_events_count ⇒ Object
readonly
Returns the value of attribute uploaded_events_count.
Class Method Summary collapse
Instance Method Summary collapse
- #<<(block) ⇒ Object
- #blob_empty? ⇒ Boolean
- #blob_full?(next_block = nil) ⇒ Boolean
- #clear_state ⇒ Object
-
#close ⇒ Object
close blob.
- #commit(tuple = nil) ⇒ Object
- #create_container_exist_recovery ⇒ Object
- #create_exist_recovery(type, name = nil) ⇒ Object
- #create_table_exist_recovery ⇒ Object
-
#initialize(channel = nil, id = nil, no_queue = false) ⇒ Blob
constructor
A new instance of Blob.
-
#notify(tuple = nil) ⇒ Object
must return whether notification was successful or failed.
- #notify_retry_later ⇒ Object
- #queue_empty? ⇒ Boolean
- #queue_size ⇒ Object
-
#state_table_delete(tuple = nil, state = nil) ⇒ Object
retturn tru on success.
-
#state_table_insert ⇒ Object
return true on success.
-
#state_table_query(storage_account_name, filter, token) ⇒ Object
return entities.
- #state_table_update(tuple = nil) ⇒ Object
- #state_to_table_entity ⇒ Object
- #state_to_tuple ⇒ Object
- #table_entity_to_tuple(options = {}) ⇒ Object
- #test_notification(storage_account_name) ⇒ Object
- #test_storage(storage_account_name) ⇒ Object
- #tuple_to_state(tuple) ⇒ Object
- #update_commited_or_uncommited_list(table_entity) ⇒ Object
- #upload(block) ⇒ Object
- #upload_retry_later ⇒ Object
Constructor Details
#initialize(channel = nil, id = nil, no_queue = false) ⇒ Blob
Returns a new instance of Blob.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 49 def initialize ( channel = nil, id = nil , no_queue = false ) @configuration = Config.current @logger = @configuration[:logger] @storage_recovery = Storage_recovery.instance @notification_recovery = Notification_recovery.instance @max_tries = @configuration[:io_max_retries] + 1 @uploaded_block_ids = [ ] @uploaded_block_numbers = [ ] @uploaded_bytesize = 0 @uploaded_events_count = 0 @sub_state = :none if channel @id = id @instrumentation_key = channel.instrumentation_key @table_id = channel.table_id @blob_max_delay = channel.blob_max_delay @event_format_ext = channel.event_format_ext unless no_queue @io_queue = Queue.new @timer = Timer.new # create a thread that handles the IO of the blob Thread.new do next_block = nil loop do block_to_upload = nil # release reference to resource for GC block_to_upload = next_block || @io_queue.pop next_block = nil if :trigger == @timer.state next_block = block_to_upload unless :wakeup == block_to_upload block_to_upload = :timeout to_commit = :commit elsif :close == block_to_upload to_commit = :commit # ignore :trigger as they are only to casue check timeout elsif :wakeup == block_to_upload # ignore :wakeup next else while @io_queue.length > 0 next_block = @io_queue.pop next if :wakeup == next_block # ignore :wakeup break if :close == next_block break if blob_full?( next_block ) break unless block_to_upload.concat( next_block ) next_block = nil end end unless to_commit @timer.set( block_to_upload.oldest_event_time + @blob_max_delay, nil ) {|object| @io_queue << :wakeup if 0 == @io_queue.length } if blob_empty? upload( block_to_upload ) block_to_upload = nil # release reference to resource for GC to_commit = :commit if blob_full? end if to_commit commit unless @uploaded_block_ids.empty? to_commit = nil @uploaded_block_ids = [ ] @timer.cancel break if :close == block_to_upload end end end end end end |
Instance Attribute Details
#blob_name ⇒ Object (readonly)
Returns the value of attribute blob_name.
28 29 30 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 28 def blob_name @blob_name end |
#container_name ⇒ Object (readonly)
Returns the value of attribute container_name.
27 28 29 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 27 def container_name @container_name end |
#instrumentation_key ⇒ Object (readonly)
Returns the value of attribute instrumentation_key.
24 25 26 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 24 def instrumentation_key @instrumentation_key end |
#io_queue ⇒ Object (readonly)
Returns the value of attribute io_queue.
33 34 35 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 33 def io_queue @io_queue end |
#last_io_exception ⇒ Object (readonly)
Returns the value of attribute last_io_exception.
35 36 37 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 35 def last_io_exception @last_io_exception end |
#oldest_event_time ⇒ Object (readonly)
Returns the value of attribute oldest_event_time.
31 32 33 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 31 def oldest_event_time @oldest_event_time end |
#storage_account_name ⇒ Object (readonly)
Returns the value of attribute storage_account_name.
26 27 28 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 26 def storage_account_name @storage_account_name end |
#table_id ⇒ Object (readonly)
Returns the value of attribute table_id.
25 26 27 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 25 def table_id @table_id end |
#uploaded_bytesize ⇒ Object (readonly)
Returns the value of attribute uploaded_bytesize.
30 31 32 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 30 def uploaded_bytesize @uploaded_bytesize end |
#uploaded_events_count ⇒ Object (readonly)
Returns the value of attribute uploaded_events_count.
29 30 31 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 29 def uploaded_events_count @uploaded_events_count end |
Class Method Details
.close ⇒ Object
41 42 43 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 41 def self.close @@closing = true end |
.stopped? ⇒ Boolean
45 46 47 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 45 def self.stopped? @@closing end |
Instance Method Details
#<<(block) ⇒ Object
522 523 524 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 522 def << ( block ) @io_queue << block end |
#blob_empty? ⇒ Boolean
142 143 144 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 142 def blob_empty? @uploaded_block_ids.empty? end |
#blob_full?(next_block = nil) ⇒ Boolean
134 135 136 137 138 139 140 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 134 def blob_full? ( next_block = nil ) if next_block BLOB_MAX_BLOCKS < @uploaded_block_ids.length + 1 || @configuration[:blob_max_events] < @uploaded_events_count + next_block.events_count || @configuration[:blob_max_bytesize] < @uploaded_bytesize + next_block.bytesize else BLOB_MAX_BLOCKS <= @uploaded_block_ids.length || @configuration[:blob_max_events] <= @uploaded_events_count || @configuration[:blob_max_bytesize] <= @uploaded_bytesize end end |
#clear_state ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 155 def clear_state @action = nil @storage_account_name = nil @container_name = nil @blob_name = nil @uploaded_block_ids = [ ] @uploaded_block_numbers = [ ] @uploaded_events_count = 0 @uploaded_bytesize = 0 @oldest_event_time = nil end |
#close ⇒ Object
close blob. It will finish whatever was already on the queue, and if necessary commit called on shutdown
130 131 132 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 130 def close @io_queue << :close end |
#commit(tuple = nil) ⇒ Object
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 389 def commit ( tuple = nil ) tuple_to_state( tuple ) if tuple unless @uploaded_block_ids.empty? @action = :commit @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable ] success = storage_io_block { @info = "#{@action.to_s} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}" # assume that exceptions can be raised due to this method: @client.blobClient.commit_blob_blocks( @container_name, @blob_name, @uploaded_block_ids ) unless @configuration[:disable_blob_upload] @log_state = :committed } if success # next stage state_table_update else @storage_recovery.recover_later( state_to_tuple, :commit, @storage_account_name ) end end end |
#create_container_exist_recovery ⇒ Object
308 309 310 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 308 def create_container_exist_recovery create_exist_recovery( :container ) { |name| @client.blobClient.create_container( name ) } end |
#create_exist_recovery(type, name = nil) ⇒ Object
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 288 def create_exist_recovery( type, name = nil ) prev_info = @info if CREATE_EXIST_ERRORS[type][0] == @recovery name ||= ( :table == type ? @configuration[:state_table_name] : @container_name ) @info = "create #{type} #{@storage_account_name}/#{name}" # assume that exceptions can be raised due to this method: yield name @logger.info { "Successed to #{@info}" } @info = prev_info elsif CREATE_EXIST_ERRORS[type][1] == @recovery @logger.info { "Successed (already exist) to #{@info}" } @info = prev_info end end |
#create_table_exist_recovery ⇒ Object
304 305 306 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 304 def create_table_exist_recovery create_exist_recovery( :table ) { |name| @client.tableClient.create_table( name ) } end |
#notify(tuple = nil) ⇒ Object
must return whether notification was successful or failed
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 263 def notify ( tuple = nil ) tuple_to_state( tuple ) if tuple @action = :notify @force_client = true # to enable get a client even if all storage_accounts marked dead @recoverable = [ :notify_failed_blob_not_accessible, :io_failure, :service_unavailable ] success = storage_io_block { set_blob_sas_url payload = create_payload @logger.debug { "notification payload: #{payload}" } @info = "#{@action.to_s} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}, blob_sas_url: #{@blob_sas_url}" # assume that exceptions can be raised due to this method: post_notification( @client.notifyClient, payload ) unless @configuration[:disable_notification] @log_state = :notified } if success Telemetry.instance.track_event { { :name => "notified", :properties => state_to_table_entity } } state_table_update else notify_retry_later end success end |
#notify_retry_later ⇒ Object
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 243 def notify_retry_later if :notify_failed_blob_not_accessible == @recovery @sub_state = @recovery @storage_recovery.recover_later( state_to_tuple, :notify, @storage_account_name ) elsif :invalid_instrumentation_key == @recovery || :invalid_table_id == @recovery @sub_state = @recovery Channels.instance.channel( @instrumentation_key, @table_id ).failed_on_notify_retry_Q << state_to_tuple else if :notify_failed_blob_not_accessible == @sub_state @storage_recovery.recover_later( state_to_tuple, :notify, @storage_account_name ) elsif :invalid_instrumentation_key == @sub_state || :invalid_table_id == @sub_state Channels.instance.channel( @instrumentation_key, @table_id ).failed_on_notify_retry_Q << state_to_tuple else @notification_recovery.recover_later( state_to_tuple ) end end end |
#queue_empty? ⇒ Boolean
146 147 148 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 146 def queue_empty? @io_queue.length == 0 if @io_queue end |
#queue_size ⇒ Object
151 152 153 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 151 def queue_size @io_queue.length end |
#state_table_delete(tuple = nil, state = nil) ⇒ Object
retturn tru on success
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 352 def state_table_delete ( tuple = nil, state = nil ) tuple_to_state( tuple ) if tuple state ||= @log_state @action = :state_table_delete @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :create_resource ] @info = "#{@action} #{state} #{@storage_account_name}/#{@container_name}/#{@blob_name}" success = storage_io_block { create_table_exist_recovery if :create_resource == @recovery @logger.info { "Note: delete entity failed, already deleted, #{@info}, state: #{state}, log_state: #{@log_state}" } else @client.tableClient.delete_entity( @configuration[:state_table_name], "#{@configuration[:partition_key_prefix]}-#{state}", @blob_name.gsub( "/", "_" ) ) end } @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name ) unless success success end |
#state_table_insert ⇒ Object
return true on success
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 313 def state_table_insert @action = :state_table_insert @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :entity_exist ] @info = "#{@action} #{@log_state} #{@storage_account_name}/#{@container_name}/#{@blob_name}" success = storage_io_block { create_table_exist_recovery if :entity_exist == @recovery raise NotRecoverableError if :uploading == @log_state else entity_values = state_to_table_entity entity_values[:PartitionKey] = "#{@configuration[:partition_key_prefix]}-#{@log_state}" entity_values[:RowKey] = @blob_name.gsub("/","_") @client.tableClient.insert_entity( @configuration[:state_table_name], entity_values ) end } @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name ) unless success || :uploading == @log_state success end |
#state_table_query(storage_account_name, filter, token) ⇒ Object
return entities
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 372 def state_table_query ( storage_account_name, filter , token ) @storage_account_name = storage_account_name @action = :state_table_query @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy ] @info = "#{@action} #{@storage_account_name}/#{@configuration[:state_table_name]}" entities = nil success = storage_io_block { create_table_exist_recovery = { :filter => filter } [:continuation_token] = token if token entities = @client.tableClient.query_entities( @configuration[:state_table_name], ) } entities end |
#state_table_update(tuple = nil) ⇒ Object
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 332 def state_table_update ( tuple = nil ) tuple_to_state( tuple ) if tuple if :uploading == @log_state state_table_delete elsif :committed == @log_state if state_table_insert && state_table_delete( nil, :uploading ) State.instance.dec_pending_commits State.instance.inc_pending_notifications # this is not a recovery, it is actually enqueue to notify @notification_recovery.enqueue( state_to_tuple ) end elsif :notified == @log_state if (!@configuration[:save_notified_blobs_records] || state_table_insert) && state_table_delete( nil, :committed ) State.instance.dec_pending_notifications end end end |
#state_to_table_entity ⇒ Object
196 197 198 199 200 201 202 203 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 196 def state_to_table_entity { :start_time => @start_time, :instrumentation_key => @instrumentation_key, :table_id => @table_id, :storage_account_name => @storage_account_name, :container_name => @container_name, :blob_name => @blob_name, :uploaded_block_ids => @uploaded_block_ids.to_s, :uploaded_block_numbers => @uploaded_block_numbers.to_s, :uploaded_events_count => @uploaded_events_count, :uploaded_bytesize => @uploaded_bytesize, :oldest_event_time => @oldest_event_time, :log_state => @log_state, :sub_state => @sub_state } end |
#state_to_tuple ⇒ Object
177 178 179 180 181 182 183 184 185 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 177 def state_to_tuple [ @start_time || Time.now.utc, @action, @instrumentation_key, @table_id, @storage_account_name, @container_name, @blob_name, @uploaded_block_ids, @uploaded_block_numbers, @uploaded_events_count, @uploaded_bytesize, @oldest_event_time, @event_format_ext, @blob_max_delay, @log_state, @sub_state ] end |
#table_entity_to_tuple(options = {}) ⇒ Object
167 168 169 170 171 172 173 174 175 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 167 def table_entity_to_tuple( = {} ) [ [:start_time.to_s] || Time.now.utc, [:action.to_s], [:instrumentation_key.to_s], [:table_id.to_s], [:storage_account_name.to_s], [:container_name.to_s], [:blob_name.to_s], eval( [:uploaded_block_ids.to_s] ), eval( [:uploaded_block_numbers.to_s] ), [:uploaded_events_count.to_s] || 0, [:uploaded_bytesize.to_s] || 0, [:oldest_event_time.to_s] || Time.now.utc, [:event_format_ext.to_s], [:blob_max_delay.to_s] || 0, [:log_state.to_s].to_sym, ([:sub_state.to_s] || :none).to_sym ] end |
#test_notification(storage_account_name) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 220 def test_notification( storage_account_name ) @storage_account_name = storage_account_name @action = :test_notification @max_tries = 1 @force_client = true # to enable get a client even if all storage_accounts marked dead @recoverable = [ :invalid_instrumentation_key, :invalid_table_id ] success = storage_io_block { if @recovery.nil? @container_name = "logstash-test-container" @blob_name = "logstash-test-blob" @table_id = GUID_NULL @instrumentation_key = GUID_NULL @info = "#{@action}" set_blob_sas_url payload = create_payload post_notification( @client.notifyClient, payload ) end } sleep( 30 ) unless success success end |
#test_storage(storage_account_name) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 206 def test_storage ( storage_account_name ) @storage_account_name = storage_account_name @action = :test_storage @max_tries = 1 @force_client = true # to enable get a client even if all storage_accounts marked dead @recoverable = [ :invalid_storage_key, :container_exist, :create_container ] storage_io_block { if @recovery.nil? || :invalid_storage_key == @recovery @info = "#{@action} #{@storage_account_name}" @client.blobClient.create_container( @configuration[:test_storage_container] ) unless @configuration[:disable_blob_upload] end } end |
#tuple_to_state(tuple) ⇒ Object
187 188 189 190 191 192 193 194 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 187 def tuple_to_state ( tuple ) ( @start_time, @action, @instrumentation_key, @table_id, @storage_account_name, @container_name, @blob_name, @uploaded_block_ids, @uploaded_block_numbers, @uploaded_events_count, @uploaded_bytesize, @oldest_event_time, @event_format_ext, @blob_max_delay, @log_state, @sub_state) = tuple end |
#update_commited_or_uncommited_list(table_entity) ⇒ Object
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 488 def update_commited_or_uncommited_list( table_entity ) tuple = table_entity_to_tuple( table_entity ) tuple_to_state( tuple ) @action = :list_blob_blocks @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :container_exist, :create_container, :create_blob ] list_blob_blocks = nil success = storage_io_block { @info = "#{@action} #{@storage_account_name}/#{@container_name}/#{@blob_name}" create_container_exist_recovery if :create_blob == @recovery list_blob_blocks = { :uncommitted => [ ], :committed => [ ] } else list_blob_blocks = @client.blobClient.list_blob_blocks( @container_name, @blob_name, { :blocklist_type => :all } ) unless :create_blob == @recovery end } if list_blob_blocks blocks = ( list_blob_blocks[:uncommitted].empty? ? list_blob_blocks[:committed] : list_blob_blocks[:uncommitted] ) blocks.each do |block| @uploaded_block_ids << [ block.name ] @uploaded_bytesize += block.size end type = ( blocks.empty? || 0 == @uploaded_bytesize ? :upload_empty : blocks[0].type ) @log_state = :committed if :committed == type { type => state_to_tuple } else nil end end |
#upload(block) ⇒ Object
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 426 def upload ( block ) @storage_account_name = nil if @uploaded_block_ids.empty? @block_to_upload = block block = nil # remove reference for GC exclude_storage_account_names = [ ] begin if @uploaded_block_ids.empty? @log_state = :uploading @uploaded_block_numbers = [ ] @uploaded_bytesize = 0 @uploaded_events_count = 0 @oldest_event_time = nil # remove record of previous upload that failed if @storage_account_name exclude_storage_account_names << @storage_account_name @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name ) end set_conatainer_and_blob_names @storage_account_name = Clients.instance.get_random_active_storage( exclude_storage_account_names ) unless @storage_account_name upload_recover.call( :io_all_dead, nil ) return false end raise UploadRetryError unless state_table_insert end @action = :upload @block_info = "blocks: #{@block_to_upload.block_numbers}, events: #{@block_to_upload.events_count}, size: #{@block_to_upload.bytes.length}" @info = "#{@action} #{@storage_account_name}/#{@container_name}/#{@blob_name}, #{@block_info}, commitId: [\"#{100001 + @uploaded_block_ids.length}\"]" @recoverable = [ :invalid_storage_key, :invalid_storage_account, :io_failure, :service_unavailable, :container_exist, :create_container ] success = storage_io_block { create_container_exist_recovery block_id = "#{100001 + @uploaded_block_ids.length}" # assume that exceptions can be raised due to this method: @client.blobClient.put_blob_block( @container_name, @blob_name, block_id, @block_to_upload.bytes ) unless @configuration[:disable_blob_upload] # upload success first_block_in_blob = @uploaded_block_ids.empty? @uploaded_block_ids << [ block_id ] @uploaded_block_numbers.concat( @block_to_upload.block_numbers ) @uploaded_bytesize += @block_to_upload.bytes.length @uploaded_events_count += @block_to_upload.events_count @oldest_event_time ||= @block_to_upload.oldest_event_time # release memory bytesize = @block_to_upload.bytesize @block_to_upload.dispose @block_to_upload = nil State.instance.inc_pending_commits if first_block_in_blob State.instance.dec_upload_bytesize( bytesize ) } upload_retry_later unless success rescue UploadRetryError @recovery = nil retry end end |
#upload_retry_later ⇒ Object
411 412 413 414 415 416 417 418 419 420 421 422 423 424 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 411 def upload_retry_later unless @uploaded_block_ids.empty? info1 = "#{:commit} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}" @logger.error { "Pospone to #{info1} (; retry later, error: #{@last_io_exception.inspect}" } @storage_recovery.recover_later( state_to_tuple, :commit, @storage_account_name ) @uploaded_block_ids = [ ] end unless :io_all_dead == @recovery raise UploadRetryError else Channels.instance.channel( @instrumentation_key, @table_id ).failed_on_upload_retry_Q << @block_to_upload @block_to_upload = nil end end |