Class: LogStash::Outputs::Application_insights::State_table
- Defined in:
- lib/logstash/outputs/application_insights/state_table.rb
Constant Summary
Constants inherited from Blob
Instance Attribute Summary
Attributes inherited from Blob
Instance Method Summary collapse
-
#initialize(tuple) ⇒ State_table
constructor
A new instance of State_table.
-
#state_table_delete(state = nil) ⇒ Object
return tru on success.
-
#state_table_insert ⇒ Object
return true on success.
-
#state_table_query(storage_account_name, filter, token) ⇒ Object
return entities.
- #state_table_update ⇒ Object
Methods inherited from Blob
close, #create_container_exist_recovery, #create_exist_recovery, #create_table_exist_recovery, stopped?, #update_commited_or_uncommited_list
Methods inherited from Context
#clear_context, #context_to_table_entity, #context_to_tuple, #table_entity_to_context, #table_entity_to_tuple, #tuple_to_context
Constructor Details
#initialize(tuple) ⇒ State_table
Returns a new instance of State_table.
24 25 26 27 28 |
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 24 def initialize( tuple ) # super first parameter must be nil. blob first parameter is channel, otherwise it will pass storage_account_name as channel super( nil ) tuple_to_state( tuple ) if tuple end |
Instance Method Details
#state_table_delete(state = nil) ⇒ Object
return tru on success
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 70 def state_table_delete ( state = nil ) state ||= @log_state @action = :state_table_delete @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :create_resource ] @info = "#{@action} #{state} #{@storage_account_name}/#{@container_name}/#{@blob_name}" success = storage_io_block { create_table_exist_recovery if :create_resource == @recovery @logger.info { "Note: delete entity failed, already deleted, #{@info}, state: #{state}, log_state: #{@log_state}" } else @client.tableClient.delete_entity( @configuration[:state_table_name], "#{@configuration[:partition_key_prefix]}-#{state}", @blob_name.gsub( "/", "_" ) ) end } @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name ) unless success success end |
#state_table_insert ⇒ Object
return true on success
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 31 def state_table_insert @action = :state_table_insert @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :entity_exist ] @info = "#{@action} #{@log_state} #{@storage_account_name}/#{@container_name}/#{@blob_name}" success = storage_io_block { create_table_exist_recovery if :entity_exist == @recovery raise NotRecoverableError if :uploading == @log_state else entity_values = state_to_table_entity entity_values[:PartitionKey] = "#{@configuration[:partition_key_prefix]}-#{@log_state}" entity_values[:RowKey] = @blob_name.gsub("/","_") @client.tableClient.insert_entity( @configuration[:state_table_name], entity_values ) end } @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name ) unless success || :uploading == @log_state success end |
#state_table_query(storage_account_name, filter, token) ⇒ Object
return entities
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 90 def state_table_query ( storage_account_name, filter , token ) @storage_account_name = storage_account_name @action = :state_table_query @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy ] @info = "#{@action} #{@storage_account_name}/#{@configuration[:state_table_name]}" entities = nil success = storage_io_block { create_table_exist_recovery = { :filter => filter } [:continuation_token] = token if token entities = @client.tableClient.query_entities( @configuration[:state_table_name], ) } entities end |
#state_table_update ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 51 def state_table_update if :uploading == @log_state state_table_delete elsif :committed == @log_state if state_table_insert && state_table_delete( :uploading ) State.instance.dec_pending_commits State.instance.inc_pending_notifications # this is not a recovery, it is actually enqueue to notify @notification_recovery.enqueue( state_to_tuple ) end elsif :notified == @log_state if (!@configuration[:save_notified_blobs_records] || state_table_insert) && state_table_delete( :committed ) State.instance.dec_pending_notifications end end end |