Class: LogStash::Outputs::Application_insights::Blob
- Inherits:
-
Object
- Object
- LogStash::Outputs::Application_insights::Blob
- Defined in:
- lib/logstash/outputs/application_insights/blob.rb
Direct Known Subclasses
Constant Summary collapse
- CREATE_EXIST_ERRORS =
{ :container => [ :create_container, :container_exist ], :table => [ :create_table, :table_exist ] }
Instance Attribute Summary collapse
-
#blob_name ⇒ Object
readonly
Returns the value of attribute blob_name.
-
#container_name ⇒ Object
readonly
Returns the value of attribute container_name.
-
#intrumentation_key ⇒ Object
readonly
Returns the value of attribute intrumentation_key.
-
#io_queue ⇒ Object
readonly
Returns the value of attribute io_queue.
-
#last_io_exception ⇒ Object
readonly
Returns the value of attribute last_io_exception.
-
#oldest_event_time ⇒ Object
readonly
Returns the value of attribute oldest_event_time.
-
#storage_account_name ⇒ Object
readonly
Returns the value of attribute storage_account_name.
-
#table_id ⇒ Object
readonly
Returns the value of attribute table_id.
-
#uploaded_bytesize ⇒ Object
readonly
Returns the value of attribute uploaded_bytesize.
-
#uploaded_events_count ⇒ Object
readonly
Returns the value of attribute uploaded_events_count.
Class Method Summary collapse
- .close ⇒ Object
- .config(configuration) ⇒ Object
- .launch_endpoint_recovery_thread ⇒ Object
- .launch_storage_recovery_table_threads(state) ⇒ Object
- .launch_storage_recovery_threads(queues, method, failure_reason) ⇒ Object
-
.recovery_table_thread(storage_account_name, state) ⇒ Object
return thread.
- .state_on?(storage_account_name, blob, failure_reason) ⇒ Boolean
- .stopped? ⇒ Boolean
- .storage_recovery_thread(storage_account_name, queue, method, failure_reason) ⇒ Object
- .validate_endpoint ⇒ Object
- .validate_storage ⇒ Object
Instance Method Summary collapse
- #<<(block) ⇒ Object
- #blob_empty? ⇒ Boolean
- #blob_full?(next_block = nil) ⇒ Boolean
- #clear_state ⇒ Object
-
#close ⇒ Object
close blob.
- #commit(tuple = nil) ⇒ Object
- #commit_recover ⇒ Object
- #create_container_exist_recovery ⇒ Object
- #create_exist_recovery(type, name = nil) ⇒ Object
- #create_table_exist_recovery ⇒ Object
-
#initialize(channel = nil, id = nil, no_queue = false) ⇒ Blob
constructor
A new instance of Blob.
-
#log_to_table_delete(tuple = nil, state = nil) ⇒ Object
retturn tru on success.
-
#log_to_table_insert ⇒ Object
return true on success.
-
#log_to_table_query(storage_account_name, filter, token) ⇒ Object
return entities.
- #log_to_table_update(tuple = nil) ⇒ Object
- #log_to_table_update_recover ⇒ Object
- #notify(tuple = nil) ⇒ Object
- #notify_recover ⇒ Object
- #queue_empty? ⇒ Boolean
- #queue_size ⇒ Object
- #state_to_table_entity ⇒ Object
- #state_to_tuple ⇒ Object
- #table_entity_to_tuple(options = {}) ⇒ Object
- #test_notification_endpoint(storage_account_name) ⇒ Object
- #test_notification_endpoint_recover ⇒ Object
- #test_storage(storage_account_name) ⇒ Object
- #test_storage_recover ⇒ Object
- #tuple_to_state(tuple) ⇒ Object
- #update_commited_or_uncommited_list(table_entity) ⇒ Object
- #upload(block, to_commit = nil) ⇒ Object
- #upload_recover ⇒ Object
Constructor Details
#initialize(channel = nil, id = nil, no_queue = false) ⇒ Blob
Returns a new instance of Blob.
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 222 def initialize ( channel = nil, id = nil , no_queue = false ) @uploaded_block_ids = [ ] @uploaded_block_numbers = [ ] @uploaded_bytesize = 0 @uploaded_events_count = 0 @max_tries = @@io_max_retries + 1 @sub_state = :none if channel @id = id @intrumentation_key = channel.intrumentation_key @table_id = channel.table_id @blob_max_delay = channel.blob_max_delay @event_format_ext = channel.event_format_ext unless no_queue @io_queue = Queue.new @timer = Timer.new # create a thread that handles the IO of the blob Thread.new do next_block = nil loop do block_to_upload = nil # release reference to resource for GC block_to_upload = next_block || @io_queue.pop next_block = nil if :trigger == @timer.state next_block = block_to_upload unless :wakeup == block_to_upload block_to_upload = :timeout to_commit = :commit elsif :close == block_to_upload to_commit = :commit # ignore :trigger as they are only to casue check timeout elsif :wakeup == block_to_upload # ignore :wakeup next else while @io_queue.length > 0 next_block = @io_queue.pop next if :wakeup == next_block # ignore :wakeup break if :close == next_block break if blob_full?( next_block ) break unless block_to_upload.concat( next_block ) next_block = nil end end unless to_commit @timer.set( block_to_upload.oldest_event_time + @blob_max_delay, nil ) {|object| @io_queue << :wakeup if 0 == @io_queue.length } if blob_empty? to_commit = :commit if blob_full? upload( block_to_upload, to_commit) block_to_upload = nil # release reference to resource for GC else commit unless @uploaded_block_ids.empty? end if to_commit to_commit = nil @uploaded_block_ids = [ ] @timer.cancel break if :close == block_to_upload end end end end end end |
Instance Attribute Details
#blob_name ⇒ Object (readonly)
Returns the value of attribute blob_name.
28 29 30 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 28 def blob_name @blob_name end |
#container_name ⇒ Object (readonly)
Returns the value of attribute container_name.
27 28 29 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 27 def container_name @container_name end |
#intrumentation_key ⇒ Object (readonly)
Returns the value of attribute intrumentation_key.
24 25 26 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 24 def intrumentation_key @intrumentation_key end |
#io_queue ⇒ Object (readonly)
Returns the value of attribute io_queue.
33 34 35 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 33 def io_queue @io_queue end |
#last_io_exception ⇒ Object (readonly)
Returns the value of attribute last_io_exception.
35 36 37 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 35 def last_io_exception @last_io_exception end |
#oldest_event_time ⇒ Object (readonly)
Returns the value of attribute oldest_event_time.
31 32 33 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 31 def oldest_event_time @oldest_event_time end |
#storage_account_name ⇒ Object (readonly)
Returns the value of attribute storage_account_name.
26 27 28 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 26 def storage_account_name @storage_account_name end |
#table_id ⇒ Object (readonly)
Returns the value of attribute table_id.
25 26 27 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 25 def table_id @table_id end |
#uploaded_bytesize ⇒ Object (readonly)
Returns the value of attribute uploaded_bytesize.
30 31 32 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 30 def uploaded_bytesize @uploaded_bytesize end |
#uploaded_events_count ⇒ Object (readonly)
Returns the value of attribute uploaded_events_count.
29 30 31 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 29 def uploaded_events_count @uploaded_events_count end |
Class Method Details
.close ⇒ Object
71 72 73 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 71 def self.close @@closing = true end |
.config(configuration) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 39 def self.config ( configuration ) @@configuration = configuration @@logger = configuration[:logger] @@io_retry_delay = configuration[:io_retry_delay] @@io_max_retries = configuration[:io_max_retries] @@blob_max_bytesize = configuration[:blob_max_bytesize] @@blob_max_events = configuration[:blob_max_events] @@state_table_name = "#{configuration[:azure_storage_table_prefix]}#{STATE_TABLE_NAME}" @@save_notified_blobs_records = configuration[:save_notified_blobs_records] @@closing = false # queues, per storage_account_name, for failed blob commit, will continue to try resending @@failed_on_commit_retry_Qs = {} launch_storage_recovery_threads( @@failed_on_commit_retry_Qs, :commit, :io_failure ) launch_storage_recovery_table_threads( :uploading ) # queues, per storage_account_name, for failed notify, will continue to try resending @@failed_on_notify_retry_Qs = {} launch_storage_recovery_threads( @@failed_on_notify_retry_Qs, :notify, :notify_failed_blob_not_accessible ) launch_storage_recovery_table_threads( :committed ) # for failed to notify due to endpoint, will continue to try resending launch_endpoint_recovery_thread # queues, per storage_account_name, for failed to log to table, will continue to try resending @@failed_on_log_to_table_retry_Qs = {} launch_storage_recovery_threads( @@failed_on_log_to_table_retry_Qs, :log_to_table_update, :io_failure ) end |
.launch_endpoint_recovery_thread ⇒ Object
79 80 81 82 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 79 def self.launch_endpoint_recovery_thread @@failed_on_notification_endpoint_retry_Q = Queue.new storage_recovery_thread( nil, @@failed_on_notification_endpoint_retry_Q, :notify, :io_failure ) end |
.launch_storage_recovery_table_threads(state) ⇒ Object
92 93 94 95 96 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 92 def self.launch_storage_recovery_table_threads ( state ) @@configuration[:storage_account_name_key].each do |storage_account_name, storage_account_keys| recovery_table_thread( storage_account_name, state) end end |
.launch_storage_recovery_threads(queues, method, failure_reason) ⇒ Object
84 85 86 87 88 89 90 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 84 def self.launch_storage_recovery_threads ( queues, method, failure_reason ) @@configuration[:storage_account_name_key].each do |storage_account_name, storage_account_keys| queues[storage_account_name] = Queue.new # a threads, per storage account name storage_recovery_thread( storage_account_name, queues[storage_account_name], method, failure_reason ) end end |
.recovery_table_thread(storage_account_name, state) ⇒ Object
return thread
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 99 def self.recovery_table_thread( storage_account_name, state ) Thread.new( storage_account_name, state ) do |storage_account_name, state| blob = Blob.new committed_tuples = [ ] uncommitted_tuples = [ ] upload_empty_tuples = [ ] token = nil finished = false filter = "#{:PartitionKey} eq '#{@@configuration[:azure_storage_blob_prefix]}-#{state}'" # should exit thread after fetching data from table, and submit recovery, the loop is only for case of failure until finished || stopped? do entities = blob.log_to_table_query( storage_account_name, filter, token ) if entities token = entities.continuation_token if :committed == state entities.each do |entity| State.instance.inc_pending_notifications tuple = blob.table_entity_to_tuple( entity.properties ) @@failed_on_notification_endpoint_retry_Q << tuple end elsif :uploading == state # first tuples are collected, before send to queues, to make sure blob states don't change in between entities.each do |entity| typed_tuple = nil until typed_tuple || stopped? typed_tuple = blob.update_commited_or_uncommited_list( entity.properties ) Stud.stoppable_sleep(60, 1) { stopped? } unless typed_tuple end next if stopped? if typed_tuple[:committed] committed_tuples << typed_tuple[:committed] elsif typed_tuple[:uncommitted] uncommitted_tuples << typed_tuple[:uncommitted] else upload_empty_tuples << typed_tuple[:upload_empty] end end end next if token committed_tuples.each do |tuple| State.instance.inc_pending_commits @@failed_on_log_to_table_retry_Qs[storage_account_name] << tuple end uncommitted_tuples.each do |tuple| State.instance.inc_pending_commits @@failed_on_commit_retry_Qs[storage_account_name] << tuple end upload_empty_tuples.each do |tuple| @@failed_on_log_to_table_retry_Qs[storage_account_name] << tuple end finished = true else Stud.stoppable_sleep(60, 1) { stopped? } end end @@logger.info { "exit table recovery thread, storage: #{storage_account_name}, state: #{state}, entities: #{entities ? entities.length : nil}" } end end |
.state_on?(storage_account_name, blob, failure_reason) ⇒ Boolean
165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 165 def self.state_on? ( storage_account_name, blob, failure_reason ) if blob if :io_failure == failure_reason @@endpoint_state_on ||= blob.test_notification_endpoint( @@configuration[:storage_account_name_key][0][0] ) else Clients.instance.storage_account_state_on?( storage_account_name ) end elsif storage_account_name Clients.instance.storage_account_state_on?( storage_account_name ) else Clients.instance.storage_account_state_on? end end |
.stopped? ⇒ Boolean
75 76 77 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 75 def self.stopped? @@closing end |
.storage_recovery_thread(storage_account_name, queue, method, failure_reason) ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 179 def self.storage_recovery_thread( storage_account_name, queue, method, failure_reason ) # a threads, per storage account name, that retries failed blob commits / notification / table updates Thread.new( storage_account_name, queue, method, failure_reason ) do |storage_account_name, queue, method, failure_reason| blob = Blob.new if :notify == method semaphore = Mutex.new action = {:method => method, :semaphore => semaphore, :counter => 0 } loop do tuple ||= queue.pop until state_on?( storage_account_name, blob, failure_reason ) do sleep( 1 ) end not_busy = nil semaphore.synchronize { not_busy = action[:counter] += 1 if 10 > action[:counter] } if not_busy Thread.new( action, tuple ) do |action, tuple| Blob.new.send( action[:method], tuple ) action[:semaphore].synchronize { action[:counter] -= 1 } end tuple = nil # release for GC else Stud.stoppable_sleep(60, 1) { 10 > action[:counter] } next end end end end |
.validate_endpoint ⇒ Object
209 210 211 212 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 209 def self.validate_endpoint io = Blob.new raise ConfigurationError, "Failed to access application insights #{@@configuration[:notification_endpoint]}, due to error #{io.last_io_exception.inspect}" unless io.test_notification_endpoint( @@configuration[:storage_account_name_key][0][0] ) end |
.validate_storage ⇒ Object
214 215 216 217 218 219 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 214 def self.validate_storage io = Blob.new @@configuration[:storage_account_name_key].each do |storage_account_name, storage_account_keys| raise ConfigurationError, "Failed access azure storage account #{storage_account_name}, due to error #{io.last_io_exception.inspect}" unless io.test_storage( storage_account_name ) end end |
Instance Method Details
#<<(block) ⇒ Object
700 701 702 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 700 def << ( block ) @io_queue << block end |
#blob_empty? ⇒ Boolean
311 312 313 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 311 def blob_empty? @uploaded_block_ids.empty? end |
#blob_full?(next_block = nil) ⇒ Boolean
303 304 305 306 307 308 309 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 303 def blob_full? ( next_block = nil ) if next_block BLOB_MAX_BLOCKS < @uploaded_block_ids.length + 1 || @@blob_max_events < @uploaded_events_count + next_block.events_count || @@blob_max_bytesize < @uploaded_bytesize + next_block.bytesize else BLOB_MAX_BLOCKS <= @uploaded_block_ids.length || @@blob_max_events <= @uploaded_events_count || @@blob_max_bytesize <= @uploaded_bytesize end end |
#clear_state ⇒ Object
324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 324 def clear_state @action = nil @storage_account_name = nil @container_name = nil @blob_name = nil @uploaded_block_ids = [ ] @uploaded_block_numbers = [ ] @uploaded_events_count = 0 @uploaded_bytesize = 0 @oldest_event_time = nil end |
#close ⇒ Object
close blob. It will finish whatever was already on the queue, and if necessary commit called on shutdown
299 300 301 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 299 def close @io_queue << :close end |
#commit(tuple = nil) ⇒ Object
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 567 def commit ( tuple = nil ) tuple_to_state( tuple ) if tuple unless @uploaded_block_ids.empty? @action = :commit @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable ] success = storage_io_block( commit_recover ) { @info = "#{@action.to_s} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}" # assume that exceptions can be raised due to this method: @client.blobClient.commit_blob_blocks( @container_name, @blob_name, @uploaded_block_ids ) unless @@configuration[:disable_blob_upload] @log_state = :committed } # next stage log_to_table_update if success end end |
#commit_recover ⇒ Object
563 564 565 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 563 def commit_recover proc do |reason, e| @@failed_on_commit_retry_Qs[@storage_account_name] << state_to_tuple end end |
#create_container_exist_recovery ⇒ Object
483 484 485 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 483 def create_container_exist_recovery create_exist_recovery( :container ) { |name| @client.blobClient.create_container( name ) } end |
#create_exist_recovery(type, name = nil) ⇒ Object
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 463 def create_exist_recovery( type, name = nil ) prev_info = @info if CREATE_EXIST_ERRORS[type][0] == @recovery name ||= ( :table == type ? @@state_table_name : @container_name ) @info = "create #{type} #{@storage_account_name}/#{name}" # assume that exceptions can be raised due to this method: yield name @@logger.info { "Successed to #{@info}" } @info = prev_info elsif CREATE_EXIST_ERRORS[type][1] == @recovery @@logger.info { "Successed (already exist) to #{@info}" } @info = prev_info end end |
#create_table_exist_recovery ⇒ Object
479 480 481 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 479 def create_table_exist_recovery create_exist_recovery( :table ) { |name| @client.tableClient.create_table( name ) } end |
#log_to_table_delete(tuple = nil, state = nil) ⇒ Object
retturn tru on success
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 528 def log_to_table_delete ( tuple = nil, state = nil ) tuple_to_state( tuple ) if tuple state ||= @log_state @action = :log_to_table_delete @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :create_resource ] @info = "#{@action} #{state} #{@storage_account_name}/#{@container_name}/#{@blob_name}" success = storage_io_block( log_to_table_update_recover ) { create_table_exist_recovery if :create_resource == @recovery @@logger.info { "Note: delete entity failed, already deleted, #{@info}, state: #{state}, log_state: #{@log_state}" } else @client.tableClient.delete_entity( @@state_table_name, "#{@@configuration[:azure_storage_blob_prefix]}-#{state}", @blob_name ) end } end |
#log_to_table_insert ⇒ Object
return true on success
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 488 def log_to_table_insert @action = :log_to_table_insert @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :entity_exist ] @info = "#{@action} #{@log_state} #{@storage_account_name}/#{@container_name}/#{@blob_name}" success = storage_io_block( :uploading == @log_state ? proc do |reason, e| end : log_to_table_update_recover ) { create_table_exist_recovery if :entity_exist == @recovery raise NotRecoverableError if :uploading == @log_state else entity_values = state_to_table_entity entity_values[:PartitionKey] = "#{@@configuration[:azure_storage_blob_prefix]}-#{@log_state}" entity_values[:RowKey] = @blob_name @client.tableClient.insert_entity( @@state_table_name, entity_values ) end } end |
#log_to_table_query(storage_account_name, filter, token) ⇒ Object
return entities
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 546 def log_to_table_query ( storage_account_name, filter , token ) @storage_account_name = storage_account_name @action = :log_to_table_query @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy ] @info = "#{@action} #{@storage_account_name}/#{@@state_table_name}" entities = nil success = storage_io_block( proc do |reason, e| end ) { create_table_exist_recovery = { :filter => filter } [:continuation_token] = token if token entities = @client.tableClient.query_entities( @@state_table_name, ) } entities end |
#log_to_table_update(tuple = nil) ⇒ Object
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 509 def log_to_table_update ( tuple = nil ) tuple_to_state( tuple ) if tuple if :uploading == @log_state log_to_table_delete elsif :committed == @log_state if log_to_table_insert && log_to_table_delete( nil, :uploading ) State.instance.dec_pending_commits State.instance.inc_pending_notifications @@failed_on_notification_endpoint_retry_Q << state_to_tuple end elsif :notified == @log_state if (!@@save_notified_blobs_records || log_to_table_insert) && log_to_table_delete( nil, :committed ) State.instance.dec_pending_notifications end end end |
#log_to_table_update_recover ⇒ Object
505 506 507 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 505 def log_to_table_update_recover proc do |reason, e| @@failed_on_log_to_table_retry_Qs[@storage_account_name] << state_to_tuple end end |
#notify(tuple = nil) ⇒ Object
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 444 def notify ( tuple = nil ) tuple_to_state( tuple ) if tuple @action = :notify @force_client = true # to enable get a client even if all storage_accounts marked dead @recoverable = [ :notify_failed_blob_not_accessible, :io_failure, :service_unavailable ] success = storage_io_block( notify_recover ) { set_blob_sas_url payload = create_payload @@logger.debug { "notification payload: #{payload}" } @info = "#{@action.to_s} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}, blob_sas_url: #{@blob_sas_url}" # assume that exceptions can be raised due to this method: post_notification( @client.notifyClient, payload ) unless @@configuration[:disable_notification] @log_state = :notified } log_to_table_update if success end |
#notify_recover ⇒ Object
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 422 def notify_recover proc do |reason, e| if :notify_failed_blob_not_accessible == reason @sub_state = reason @@failed_on_notify_retry_Qs[@storage_account_name] << state_to_tuple elsif :invalid_intrumentation_key == reason || :invalid_table_id == reason @sub_state = reason Channels.instance.channel( @intrumentation_key, @table_id ).failed_on_notify_retry_Q << state_to_tuple else @@endpoint_state_on = false if :notify_failed_blob_not_accessible == @sub_state @@failed_on_notify_retry_Qs[@storage_account_name] << state_to_tuple elsif :invalid_intrumentation_key == @sub_state || :invalid_table_id == @sub_state Channels.instance.channel( @intrumentation_key, @table_id ).failed_on_notify_retry_Q << state_to_tuple else @@failed_on_notification_endpoint_retry_Q << state_to_tuple end end end end |
#queue_empty? ⇒ Boolean
315 316 317 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 315 def queue_empty? @io_queue.length == 0 if @io_queue end |
#queue_size ⇒ Object
320 321 322 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 320 def queue_size @io_queue.length end |
#state_to_table_entity ⇒ Object
365 366 367 368 369 370 371 372 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 365 def state_to_table_entity { :start_time => @start_time, :intrumentation_key => @intrumentation_key, :table_id => @table_id, :storage_account_name => @storage_account_name, :container_name => @container_name, :blob_name => @blob_name, :uploaded_block_ids => @uploaded_block_ids.to_s, :uploaded_block_numbers => @uploaded_block_numbers.to_s, :uploaded_events_count => @uploaded_events_count, :uploaded_bytesize => @uploaded_bytesize, :oldest_event_time => @oldest_event_time, :log_state => @log_state, :sub_state => @sub_state } end |
#state_to_tuple ⇒ Object
346 347 348 349 350 351 352 353 354 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 346 def state_to_tuple [ @start_time || Time.now.utc, @action, @intrumentation_key, @table_id, @storage_account_name, @container_name, @blob_name, @uploaded_block_ids, @uploaded_block_numbers, @uploaded_events_count, @uploaded_bytesize, @oldest_event_time, @event_format_ext, @blob_max_delay, @log_state, @sub_state ] end |
#table_entity_to_tuple(options = {}) ⇒ Object
336 337 338 339 340 341 342 343 344 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 336 def table_entity_to_tuple( = {} ) [ [:start_time.to_s] || Time.now.utc, [:action.to_s], [:intrumentation_key.to_s], [:table_id.to_s], [:storage_account_name.to_s], [:container_name.to_s], [:blob_name.to_s], eval( [:uploaded_block_ids.to_s] ), eval( [:uploaded_block_numbers.to_s] ), [:uploaded_events_count.to_s] || 0, [:uploaded_bytesize.to_s] || 0, [:oldest_event_time.to_s] || Time.now.utc, [:event_format_ext.to_s], [:blob_max_delay.to_s] || 0, [:log_state.to_s].to_sym, ([:sub_state.to_s] || :none).to_sym ] end |
#test_notification_endpoint(storage_account_name) ⇒ Object
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 399 def test_notification_endpoint( storage_account_name ) @storage_account_name = storage_account_name @action = :test_notification_endpoint @max_tries = 1 @force_client = true # to enable get a client even if all storage_accounts marked dead @recoverable = [ ] success = storage_io_block( test_notification_endpoint_recover ) { if @recovery.nil? @container_name = "logstash-test-container" @blob_name = "logstash-test-blob" @table_id = GUID_NULL @intrumentation_key = GUID_NULL @info = "#{@action}" set_blob_sas_url payload = create_payload post_notification( @client.notifyClient, payload ) end } sleep( 30 ) unless success success end |
#test_notification_endpoint_recover ⇒ Object
395 396 397 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 395 def test_notification_endpoint_recover proc do |reason, e| @recovery = :ok if :invalid_intrumentation_key == reason || :invalid_table_id == reason end end |
#test_storage(storage_account_name) ⇒ Object
380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 380 def test_storage ( storage_account_name ) @storage_account_name = storage_account_name @action = :test_storage @max_tries = 1 @force_client = true # to enable get a client even if all storage_accounts marked dead @recoverable = [ :invalid_storage_key ] storage_io_block( test_storage_recover ) { if @recovery.nil? || :invalid_storage_key == @recovery container_name = "logstash-test-container" @info = "#{@action} #{@storage_account_name}" @client.blobClient.create_container( container_name ) unless @@configuration[:disable_blob_upload] end } end |
#test_storage_recover ⇒ Object
375 376 377 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 375 def test_storage_recover proc do |reason, e| @recovery = :ok if :container_exist == reason || :create_container == reason end end |
#tuple_to_state(tuple) ⇒ Object
356 357 358 359 360 361 362 363 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 356 def tuple_to_state ( tuple ) ( @start_time, @action, @intrumentation_key, @table_id, @storage_account_name, @container_name, @blob_name, @uploaded_block_ids, @uploaded_block_numbers, @uploaded_events_count, @uploaded_bytesize, @oldest_event_time, @event_format_ext, @blob_max_delay, @log_state, @sub_state) = tuple end |
#update_commited_or_uncommited_list(table_entity) ⇒ Object
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 666 def update_commited_or_uncommited_list( table_entity ) tuple = table_entity_to_tuple( table_entity ) tuple_to_state( tuple ) @action = :list_blob_blocks @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :container_exist, :create_container, :create_blob ] list_blob_blocks = nil success = storage_io_block( proc do |reason, e| end ) { @info = "#{@action} #{@storage_account_name}/#{@container_name}/#{@blob_name}" create_container_exist_recovery if :create_blob == @recovery list_blob_blocks = { :uncommitted => [ ], :committed => [ ] } else list_blob_blocks = @client.blobClient.list_blob_blocks( @container_name, @blob_name, { :blocklist_type => :all } ) unless :create_blob == @recovery end } if list_blob_blocks blocks = ( list_blob_blocks[:uncommitted].empty? ? list_blob_blocks[:committed] : list_blob_blocks[:uncommitted] ) blocks.each do |block| @uploaded_block_ids << [ block.name ] @uploaded_bytesize += block.size end type = ( blocks.empty? || 0 == @uploaded_bytesize ? :upload_empty : blocks[0].type ) @log_state = :committed if :committed == type { type => state_to_tuple } else nil end end |
#upload(block, to_commit = nil) ⇒ Object
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 602 def upload ( block, to_commit = nil ) @storage_account_name = nil if @uploaded_block_ids.empty? @block_to_upload = block block = nil # remove reference for GC exclude_storage_account_names = [ ] begin if @uploaded_block_ids.empty? @log_state = :uploading @uploaded_block_numbers = [ ] @uploaded_bytesize = 0 @uploaded_events_count = 0 @oldest_event_time = nil # remove record of previous upload that failed if @storage_account_name exclude_storage_account_names << @storage_account_name @@failed_on_log_to_table_retry_Qs[@storage_account_name] << state_to_tuple end set_conatainer_and_blob_names @storage_account_name = Clients.instance.get_random_active_storage( exclude_storage_account_names ) unless @storage_account_name upload_recover.call( :io_all_dead, nil ) return false end raise UploadRetryError unless log_to_table_insert end @action = :upload @block_info = "blocks: #{@block_to_upload.block_numbers}, events: #{@block_to_upload.events_count}, size: #{@block_to_upload.bytes.length}" @info = "#{@action} #{@storage_account_name}/#{@container_name}/#{@blob_name}, #{@block_info}, commitId: [\"#{100001 + @uploaded_block_ids.length}\"]" @recoverable = [ :invalid_storage_key, :invalid_storage_account, :io_failure, :service_unavailable, :container_exist, :create_container ] success = storage_io_block( upload_recover ) { create_container_exist_recovery block_id = "#{100001 + @uploaded_block_ids.length}" # assume that exceptions can be raised due to this method: @client.blobClient.put_blob_block( @container_name, @blob_name, block_id, @block_to_upload.bytes ) unless @@configuration[:disable_blob_upload] # upload success first_block_in_blob = @uploaded_block_ids.empty? @uploaded_block_ids << [ block_id ] @uploaded_block_numbers.concat( @block_to_upload.block_numbers ) @uploaded_bytesize += @block_to_upload.bytes.length @uploaded_events_count += @block_to_upload.events_count @oldest_event_time ||= @block_to_upload.oldest_event_time # release memory bytesize = @block_to_upload.bytesize @block_to_upload.dispose @block_to_upload = nil State.instance.inc_pending_commits if first_block_in_blob State.instance.dec_upload_bytesize( bytesize ) Telemetry.instance.track_event("uploading", {:properties => state_to_table_entity}) } raise UploadRetryError if :invalid_storage_account == @recovery commit if success && to_commit rescue UploadRetryError @recovery = nil retry end end |
#upload_recover ⇒ Object
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 |
# File 'lib/logstash/outputs/application_insights/blob.rb', line 585 def upload_recover proc do |reason, e| unless @uploaded_block_ids.empty? info1 = "#{:commit} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}" @@logger.error { "Pospone to #{info1} (; retry later, error: #{e.inspect}" } @@failed_on_commit_retry_Qs[@storage_account_name] << state_to_tuple @uploaded_block_ids = [ ] end unless :io_all_dead == reason @recovery = :invalid_storage_account else Channels.instance.channel( @intrumentation_key, @table_id ).failed_on_upload_retry_Q << @block_to_upload @block_to_upload = nil end end end |