Class: LogStash::Outputs::Application_insights::Blob

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/blob.rb

Constant Summary collapse

CREATE_EXIST_ERRORS =
{ :container => [ :create_container, :container_exist ], :table => [ :create_table, :table_exist ] }
@@closing =
false

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel = nil, id = nil, no_queue = false) ⇒ Blob

Returns a new instance of Blob.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/logstash/outputs/application_insights/blob.rb', line 49

def initialize ( channel = nil, id = nil , no_queue = false )
  @configuration = Config.current
  @logger = @configuration[:logger]
  @storage_recovery = Storage_recovery.instance
  @notification_recovery = Notification_recovery.instance
  @max_tries = @configuration[:io_max_retries] + 1

  @uploaded_block_ids = [  ]
  @uploaded_block_numbers = [  ]
  @uploaded_bytesize = 0
  @uploaded_events_count = 0
  @sub_state = :none

  if channel
    @id = id
    @instrumentation_key = channel.instrumentation_key
    @table_id = channel.table_id
    @blob_max_delay = channel.blob_max_delay

    @event_format_ext = channel.event_format_ext

    unless no_queue

      @io_queue = Queue.new
      @timer = Timer.new
  
      # create a thread that handles the IO of the blob
      Thread.new do
        next_block = nil
        loop do
          block_to_upload = nil # release reference to resource for GC
          block_to_upload = next_block || @io_queue.pop
          next_block = nil

          if :trigger == @timer.state
            next_block = block_to_upload unless :wakeup == block_to_upload
            block_to_upload = :timeout
            to_commit = :commit

          elsif :close == block_to_upload
            to_commit = :commit

          # ignore :trigger as they are only to casue check timeout
          elsif :wakeup == block_to_upload # ignore :wakeup
            next

          else
            while @io_queue.length > 0
              next_block = @io_queue.pop
              next if :wakeup == next_block # ignore :wakeup
              break if :close == next_block
              break if blob_full?( next_block )
              break unless block_to_upload.concat( next_block )
              next_block = nil 
            end
          end

          unless to_commit
            @timer.set( block_to_upload.oldest_event_time + @blob_max_delay, nil ) {|object| @io_queue << :wakeup if 0 == @io_queue.length } if blob_empty?
            upload( block_to_upload )
            block_to_upload = nil # release reference to resource for GC
            to_commit = :commit if blob_full?
          end

          if to_commit
            commit unless @uploaded_block_ids.empty?
            to_commit = nil
            @uploaded_block_ids = [  ]
            @timer.cancel
            break if :close == block_to_upload
          end
        end
      end
    end

  end

end

Instance Attribute Details

#blob_nameObject (readonly)

Returns the value of attribute blob_name.



28
29
30
# File 'lib/logstash/outputs/application_insights/blob.rb', line 28

def blob_name
  @blob_name
end

#container_nameObject (readonly)

Returns the value of attribute container_name.



27
28
29
# File 'lib/logstash/outputs/application_insights/blob.rb', line 27

def container_name
  @container_name
end

#instrumentation_keyObject (readonly)

Returns the value of attribute instrumentation_key.



24
25
26
# File 'lib/logstash/outputs/application_insights/blob.rb', line 24

def instrumentation_key
  @instrumentation_key
end

#io_queueObject (readonly)

Returns the value of attribute io_queue.



33
34
35
# File 'lib/logstash/outputs/application_insights/blob.rb', line 33

def io_queue
  @io_queue
end

#last_io_exceptionObject (readonly)

Returns the value of attribute last_io_exception.



35
36
37
# File 'lib/logstash/outputs/application_insights/blob.rb', line 35

def last_io_exception
  @last_io_exception
end

#oldest_event_timeObject (readonly)

Returns the value of attribute oldest_event_time.



31
32
33
# File 'lib/logstash/outputs/application_insights/blob.rb', line 31

def oldest_event_time
  @oldest_event_time
end

#storage_account_nameObject (readonly)

Returns the value of attribute storage_account_name.



26
27
28
# File 'lib/logstash/outputs/application_insights/blob.rb', line 26

def 
  @storage_account_name
end

#table_idObject (readonly)

Returns the value of attribute table_id.



25
26
27
# File 'lib/logstash/outputs/application_insights/blob.rb', line 25

def table_id
  @table_id
end

#uploaded_bytesizeObject (readonly)

Returns the value of attribute uploaded_bytesize.



30
31
32
# File 'lib/logstash/outputs/application_insights/blob.rb', line 30

def uploaded_bytesize
  @uploaded_bytesize
end

#uploaded_events_countObject (readonly)

Returns the value of attribute uploaded_events_count.



29
30
31
# File 'lib/logstash/outputs/application_insights/blob.rb', line 29

def uploaded_events_count
  @uploaded_events_count
end

Class Method Details

.closeObject



41
42
43
# File 'lib/logstash/outputs/application_insights/blob.rb', line 41

def self.close
  @@closing = true
end

.stopped?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/logstash/outputs/application_insights/blob.rb', line 45

def self.stopped?
  @@closing
end

Instance Method Details

#<<(block) ⇒ Object



522
523
524
# File 'lib/logstash/outputs/application_insights/blob.rb', line 522

def << ( block )
  @io_queue << block
end

#blob_empty?Boolean

Returns:

  • (Boolean)


142
143
144
# File 'lib/logstash/outputs/application_insights/blob.rb', line 142

def blob_empty?
  @uploaded_block_ids.empty?
end

#blob_full?(next_block = nil) ⇒ Boolean

Returns:

  • (Boolean)


134
135
136
137
138
139
140
# File 'lib/logstash/outputs/application_insights/blob.rb', line 134

def blob_full? ( next_block = nil )
  if next_block
    BLOB_MAX_BLOCKS < @uploaded_block_ids.length + 1 || @configuration[:blob_max_events] < @uploaded_events_count + next_block.events_count || @configuration[:blob_max_bytesize] < @uploaded_bytesize  + next_block.bytesize
  else
    BLOB_MAX_BLOCKS <= @uploaded_block_ids.length || @configuration[:blob_max_events] <= @uploaded_events_count || @configuration[:blob_max_bytesize] <= @uploaded_bytesize
  end
end

#clear_stateObject



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/logstash/outputs/application_insights/blob.rb', line 155

def clear_state
  @action = nil
  @storage_account_name = nil
  @container_name = nil
  @blob_name = nil
  @uploaded_block_ids = [  ]
  @uploaded_block_numbers = [  ]
  @uploaded_events_count = 0
  @uploaded_bytesize = 0
  @oldest_event_time = nil
end

#closeObject

close blob. It will finish whatever was already on the queue, and if necessary commit called on shutdown



130
131
132
# File 'lib/logstash/outputs/application_insights/blob.rb', line 130

def close
  @io_queue << :close
end

#commit(tuple = nil) ⇒ Object



389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/logstash/outputs/application_insights/blob.rb', line 389

def commit ( tuple = nil )
  tuple_to_state( tuple ) if tuple

  unless @uploaded_block_ids.empty?
    @action = :commit
    @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable ]
    success =  storage_io_block {
      @info = "#{@action.to_s} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}"
      # assume that exceptions can be raised due to this method:
      @client.blobClient.commit_blob_blocks( @container_name, @blob_name, @uploaded_block_ids ) unless @configuration[:disable_blob_upload]
      @log_state = :committed
    }
    if success
      # next stage
      state_table_update
    else
      @storage_recovery.recover_later( state_to_tuple, :commit, @storage_account_name )
    end
  end
end

#create_container_exist_recoveryObject



308
309
310
# File 'lib/logstash/outputs/application_insights/blob.rb', line 308

def create_container_exist_recovery
  create_exist_recovery( :container ) { |name| @client.blobClient.create_container( name ) }
end

#create_exist_recovery(type, name = nil) ⇒ Object



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/logstash/outputs/application_insights/blob.rb', line 288

def create_exist_recovery( type, name = nil )
  prev_info = @info
  if CREATE_EXIST_ERRORS[type][0] == @recovery
    name ||= ( :table == type ? @configuration[:state_table_name] : @container_name )
    @info = "create #{type} #{@storage_account_name}/#{name}"

    # assume that exceptions can be raised due to this method:
    yield name
    @logger.info { "Successed to #{@info}" }
    @info = prev_info
  elsif CREATE_EXIST_ERRORS[type][1] == @recovery
    @logger.info { "Successed (already exist) to #{@info}" }
    @info = prev_info
  end
end

#create_table_exist_recoveryObject



304
305
306
# File 'lib/logstash/outputs/application_insights/blob.rb', line 304

def create_table_exist_recovery
  create_exist_recovery( :table ) { |name| @client.tableClient.create_table( name ) }
end

#notify(tuple = nil) ⇒ Object

must return whether notification was successful or failed



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/logstash/outputs/application_insights/blob.rb', line 263

def notify ( tuple = nil )
  tuple_to_state( tuple ) if tuple
  @action = :notify
  @force_client = true # to enable get a client even if all storage_accounts marked dead
  @recoverable = [ :notify_failed_blob_not_accessible, :io_failure, :service_unavailable ]
  success = storage_io_block {
    set_blob_sas_url
    payload = create_payload
    @logger.debug { "notification payload: #{payload}" }
    @info = "#{@action.to_s} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}, blob_sas_url: #{@blob_sas_url}"

    # assume that exceptions can be raised due to this method:
    post_notification( @client.notifyClient, payload ) unless @configuration[:disable_notification]
    @log_state = :notified
  }
  if success
    Telemetry.instance.track_event { { :name => "notified", :properties => state_to_table_entity } }
    state_table_update
  else
    notify_retry_later
  end
  success
end

#notify_retry_laterObject



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/logstash/outputs/application_insights/blob.rb', line 243

def notify_retry_later
  if :notify_failed_blob_not_accessible == @recovery
    @sub_state = @recovery
    @storage_recovery.recover_later( state_to_tuple, :notify, @storage_account_name )
  elsif :invalid_instrumentation_key == @recovery || :invalid_table_id == @recovery
    @sub_state = @recovery
    Channels.instance.channel( @instrumentation_key, @table_id ).failed_on_notify_retry_Q << state_to_tuple

  else
    if :notify_failed_blob_not_accessible == @sub_state
      @storage_recovery.recover_later( state_to_tuple, :notify, @storage_account_name )
    elsif :invalid_instrumentation_key == @sub_state || :invalid_table_id == @sub_state
      Channels.instance.channel( @instrumentation_key, @table_id ).failed_on_notify_retry_Q << state_to_tuple
    else
      @notification_recovery.recover_later( state_to_tuple )
    end
  end
end

#queue_empty?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/logstash/outputs/application_insights/blob.rb', line 146

def queue_empty?
  @io_queue.length == 0 if @io_queue
end

#queue_sizeObject



151
152
153
# File 'lib/logstash/outputs/application_insights/blob.rb', line 151

def queue_size
  @io_queue.length
end

#state_table_delete(tuple = nil, state = nil) ⇒ Object

retturn tru on success



352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/logstash/outputs/application_insights/blob.rb', line 352

def state_table_delete ( tuple = nil, state = nil )
  tuple_to_state( tuple ) if tuple
  state ||= @log_state
  @action = :state_table_delete
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :create_resource ]
  @info  = "#{@action} #{state} #{@storage_account_name}/#{@container_name}/#{@blob_name}"

  success =  storage_io_block {
    create_table_exist_recovery
    if :create_resource == @recovery
      @logger.info { "Note: delete entity failed, already deleted, #{@info}, state: #{state}, log_state: #{@log_state}" }
    else
      @client.tableClient.delete_entity( @configuration[:state_table_name], "#{@configuration[:partition_key_prefix]}-#{state}", @blob_name.gsub( "/", "_" ) )
    end
  }
  @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name ) unless success
  success
end

#state_table_insertObject

return true on success



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/logstash/outputs/application_insights/blob.rb', line 313

def state_table_insert
  @action = :state_table_insert
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :entity_exist ]
  @info  = "#{@action} #{@log_state} #{@storage_account_name}/#{@container_name}/#{@blob_name}"
  success =  storage_io_block {
    create_table_exist_recovery
    if :entity_exist == @recovery
      raise NotRecoverableError if :uploading == @log_state
    else
      entity_values = state_to_table_entity
      entity_values[:PartitionKey] = "#{@configuration[:partition_key_prefix]}-#{@log_state}"
      entity_values[:RowKey] = @blob_name.gsub("/","_")
      @client.tableClient.insert_entity( @configuration[:state_table_name], entity_values )
    end
  }
  @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name )  unless success || :uploading == @log_state
  success
end

#state_table_query(storage_account_name, filter, token) ⇒ Object

return entities



372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/logstash/outputs/application_insights/blob.rb', line 372

def state_table_query ( , filter , token )
  @storage_account_name = 

  @action = :state_table_query
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy ]
  @info  = "#{@action} #{@storage_account_name}/#{@configuration[:state_table_name]}"

  entities = nil
  success =  storage_io_block {
    create_table_exist_recovery
    options = { :filter => filter }
    options[:continuation_token] = token if token
    entities = @client.tableClient.query_entities( @configuration[:state_table_name], options )
  }
  entities
end

#state_table_update(tuple = nil) ⇒ Object



332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/logstash/outputs/application_insights/blob.rb', line 332

def state_table_update ( tuple = nil )
  tuple_to_state( tuple ) if tuple
  if :uploading == @log_state
    state_table_delete
  elsif :committed == @log_state
    if state_table_insert && state_table_delete( nil, :uploading )
      State.instance.dec_pending_commits
      State.instance.inc_pending_notifications
      # this is not a recovery, it is actually enqueue to notify
      @notification_recovery.enqueue( state_to_tuple )
    end
  elsif :notified == @log_state
    if (!@configuration[:save_notified_blobs_records] || state_table_insert) && state_table_delete( nil, :committed ) 
      State.instance.dec_pending_notifications
    end
  end
end

#state_to_table_entityObject



196
197
198
199
200
201
202
203
# File 'lib/logstash/outputs/application_insights/blob.rb', line 196

def state_to_table_entity
  { :start_time => @start_time, :instrumentation_key => @instrumentation_key, :table_id => @table_id, 
    :storage_account_name => @storage_account_name, :container_name => @container_name, :blob_name => @blob_name, 
    :uploaded_block_ids => @uploaded_block_ids.to_s, :uploaded_block_numbers => @uploaded_block_numbers.to_s, 
    :uploaded_events_count => @uploaded_events_count, :uploaded_bytesize => @uploaded_bytesize, :oldest_event_time => @oldest_event_time,
    :log_state => @log_state, :sub_state => @sub_state
  }
end

#state_to_tupleObject



177
178
179
180
181
182
183
184
185
# File 'lib/logstash/outputs/application_insights/blob.rb', line 177

def state_to_tuple
  [ @start_time || Time.now.utc, @action, @instrumentation_key, @table_id, 
    @storage_account_name, @container_name, @blob_name, 
    @uploaded_block_ids, @uploaded_block_numbers, 
    @uploaded_events_count, @uploaded_bytesize, @oldest_event_time,
    @event_format_ext, @blob_max_delay,
    @log_state, @sub_state
  ]
end

#table_entity_to_tuple(options = {}) ⇒ Object



167
168
169
170
171
172
173
174
175
# File 'lib/logstash/outputs/application_insights/blob.rb', line 167

def table_entity_to_tuple( options = {} )
  [ options[:start_time.to_s] || Time.now.utc, options[:action.to_s], options[:instrumentation_key.to_s], options[:table_id.to_s], 
    options[:storage_account_name.to_s], options[:container_name.to_s], options[:blob_name.to_s], 
    eval( options[:uploaded_block_ids.to_s] ), eval( options[:uploaded_block_numbers.to_s] ), 
    options[:uploaded_events_count.to_s] || 0, options[:uploaded_bytesize.to_s] || 0, options[:oldest_event_time.to_s] || Time.now.utc,
    options[:event_format_ext.to_s], options[:blob_max_delay.to_s] || 0,
    options[:log_state.to_s].to_sym, (options[:sub_state.to_s] || :none).to_sym
  ]
end

#test_notification(storage_account_name) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/logstash/outputs/application_insights/blob.rb', line 220

def test_notification(  )
  @storage_account_name = 
  @action = :test_notification
  @max_tries = 1
  @force_client = true # to enable get a client even if all storage_accounts marked dead
  @recoverable = [ :invalid_instrumentation_key, :invalid_table_id ]
  success = storage_io_block {
    if @recovery.nil?
      @container_name = "logstash-test-container"
      @blob_name = "logstash-test-blob"
      @table_id = GUID_NULL
      @instrumentation_key = GUID_NULL
      @info = "#{@action}"
      set_blob_sas_url
      payload = create_payload
      post_notification( @client.notifyClient, payload )
    end
  }
  sleep( 30 ) unless success
  success
end

#test_storage(storage_account_name) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/logstash/outputs/application_insights/blob.rb', line 206

def test_storage (  )
  @storage_account_name = 
  @action = :test_storage
  @max_tries = 1
  @force_client = true # to enable get a client even if all storage_accounts marked dead
  @recoverable = [ :invalid_storage_key, :container_exist, :create_container ]
  storage_io_block {
    if @recovery.nil? || :invalid_storage_key == @recovery
      @info = "#{@action} #{@storage_account_name}"
      @client.blobClient.create_container( @configuration[:test_storage_container] ) unless @configuration[:disable_blob_upload]
    end
  }
end

#tuple_to_state(tuple) ⇒ Object



187
188
189
190
191
192
193
194
# File 'lib/logstash/outputs/application_insights/blob.rb', line 187

def tuple_to_state ( tuple )
  ( @start_time, @action, @instrumentation_key, @table_id, 
    @storage_account_name, @container_name, @blob_name, 
    @uploaded_block_ids, @uploaded_block_numbers, 
    @uploaded_events_count, @uploaded_bytesize, @oldest_event_time,
    @event_format_ext, @blob_max_delay,
    @log_state, @sub_state) = tuple
end

#update_commited_or_uncommited_list(table_entity) ⇒ Object



488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
# File 'lib/logstash/outputs/application_insights/blob.rb', line 488

def update_commited_or_uncommited_list( table_entity )
  tuple = table_entity_to_tuple( table_entity )

  tuple_to_state( tuple )
  @action = :list_blob_blocks
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :container_exist, :create_container, :create_blob ]
  list_blob_blocks = nil
  success =  storage_io_block {
    @info = "#{@action} #{@storage_account_name}/#{@container_name}/#{@blob_name}"

    create_container_exist_recovery
    if :create_blob == @recovery
      list_blob_blocks = { :uncommitted => [  ], :committed => [  ] }
    else
      list_blob_blocks = @client.blobClient.list_blob_blocks( @container_name, @blob_name, { :blocklist_type => :all } ) unless :create_blob == @recovery
    end
  }

  if list_blob_blocks
    blocks = ( list_blob_blocks[:uncommitted].empty? ? list_blob_blocks[:committed] : list_blob_blocks[:uncommitted] )
    blocks.each do |block|
      @uploaded_block_ids << [ block.name ]
      @uploaded_bytesize += block.size
    end
    type = ( blocks.empty? || 0 == @uploaded_bytesize ? :upload_empty : blocks[0].type )

    @log_state = :committed if :committed == type
    { type => state_to_tuple }
  else
    nil
  end
end

#upload(block) ⇒ Object



426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/logstash/outputs/application_insights/blob.rb', line 426

def upload ( block )
  @storage_account_name = nil if @uploaded_block_ids.empty?
  @block_to_upload = block
  block = nil # remove reference for GC
   = [  ]
  begin
    if @uploaded_block_ids.empty?
      @log_state = :uploading
      @uploaded_block_numbers = [  ]
      @uploaded_bytesize = 0
      @uploaded_events_count = 0
      @oldest_event_time = nil

      # remove record of previous upload that failed
      if @storage_account_name
         << @storage_account_name
        @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name )
      end
      set_conatainer_and_blob_names
      @storage_account_name = Clients.instance.get_random_active_storage(  )
      unless @storage_account_name
        upload_recover.call( :io_all_dead, nil )
        return false
      end
      raise UploadRetryError unless state_table_insert
    end

    @action = :upload
    @block_info = "blocks: #{@block_to_upload.block_numbers}, events: #{@block_to_upload.events_count}, size: #{@block_to_upload.bytes.length}"
    @info = "#{@action} #{@storage_account_name}/#{@container_name}/#{@blob_name}, #{@block_info}, commitId: [\"#{100001 + @uploaded_block_ids.length}\"]"
    @recoverable = [ :invalid_storage_key, :invalid_storage_account, :io_failure, :service_unavailable, :container_exist, :create_container ]

    success = storage_io_block {
      create_container_exist_recovery
      block_id = "#{100001 + @uploaded_block_ids.length}"

      # assume that exceptions can be raised due to this method:
      @client.blobClient.put_blob_block( @container_name, @blob_name, block_id, @block_to_upload.bytes ) unless @configuration[:disable_blob_upload]

      # upload success
      first_block_in_blob = @uploaded_block_ids.empty?
      @uploaded_block_ids << [ block_id ]
      @uploaded_block_numbers.concat( @block_to_upload.block_numbers )
      @uploaded_bytesize += @block_to_upload.bytes.length
      @uploaded_events_count += @block_to_upload.events_count
      @oldest_event_time ||= @block_to_upload.oldest_event_time

      # release memory
      bytesize = @block_to_upload.bytesize
      @block_to_upload.dispose
      @block_to_upload = nil
      State.instance.inc_pending_commits if first_block_in_blob
      State.instance.dec_upload_bytesize( bytesize )
    }

    upload_retry_later unless success
  rescue UploadRetryError
    @recovery = nil
    retry
  end
end

#upload_retry_laterObject



411
412
413
414
415
416
417
418
419
420
421
422
423
424
# File 'lib/logstash/outputs/application_insights/blob.rb', line 411

def upload_retry_later
  unless @uploaded_block_ids.empty?
    info1 = "#{:commit} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}"
    @logger.error { "Pospone to #{info1} (; retry later, error: #{@last_io_exception.inspect}" }
    @storage_recovery.recover_later( state_to_tuple, :commit, @storage_account_name )
    @uploaded_block_ids = [  ]
  end
  unless :io_all_dead == @recovery
    raise UploadRetryError
  else 
    Channels.instance.channel( @instrumentation_key, @table_id ).failed_on_upload_retry_Q << @block_to_upload
    @block_to_upload = nil
  end
end