Class: LogStash::Outputs::Application_insights::State_table

Inherits:
Blob
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/state_table.rb

Constant Summary

Constants inherited from Blob

Blob::CREATE_EXIST_ERRORS

Instance Attribute Summary

Attributes inherited from Blob

#last_io_exception

Instance Method Summary collapse

Methods inherited from Blob

close, #create_container_exist_recovery, #create_exist_recovery, #create_table_exist_recovery, stopped?, #update_commited_or_uncommited_list

Methods inherited from Context

#clear_context, #context_to_table_entity, #context_to_tuple, #table_entity_to_context, #table_entity_to_tuple, #tuple_to_context

Constructor Details

#initialize(tuple) ⇒ State_table

Returns a new instance of State_table.



24
25
26
27
28
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 24

def initialize( tuple )
  # super first parameter must be nil. blob first parameter is channel, otherwise it will pass storage_account_name as channel
  super( nil )
  tuple_to_state( tuple ) if tuple
end

Instance Method Details

#state_table_delete(state = nil) ⇒ Object

return tru on success



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 70

def state_table_delete ( state = nil )
  state ||= @log_state
  @action = :state_table_delete
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :create_resource ]
  @info  = "#{@action} #{state} #{@storage_account_name}/#{@container_name}/#{@blob_name}"

  success =  storage_io_block {
    create_table_exist_recovery
    if :create_resource == @recovery
      @logger.info { "Note: delete entity failed, already deleted, #{@info}, state: #{state}, log_state: #{@log_state}" }
    else
      @client.tableClient.delete_entity( @configuration[:state_table_name], "#{@configuration[:partition_key_prefix]}-#{state}", @blob_name.gsub( "/", "_" ) )
    end
  }
  @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name ) unless success
  success
end

#state_table_insertObject

return true on success



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 31

def state_table_insert
  @action = :state_table_insert
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :entity_exist ]
  @info  = "#{@action} #{@log_state} #{@storage_account_name}/#{@container_name}/#{@blob_name}"
  success =  storage_io_block {
    create_table_exist_recovery
    if :entity_exist == @recovery
      raise NotRecoverableError if :uploading == @log_state
    else
      entity_values = state_to_table_entity
      entity_values[:PartitionKey] = "#{@configuration[:partition_key_prefix]}-#{@log_state}"
      entity_values[:RowKey] = @blob_name.gsub("/","_")
      @client.tableClient.insert_entity( @configuration[:state_table_name], entity_values )
    end
  }
  @storage_recovery.recover_later( state_to_tuple, :state_table_update, @storage_account_name )  unless success || :uploading == @log_state
  success
end

#state_table_query(storage_account_name, filter, token) ⇒ Object

return entities



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 90

def state_table_query ( , filter , token )
  @storage_account_name = 

  @action = :state_table_query
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy ]
  @info  = "#{@action} #{@storage_account_name}/#{@configuration[:state_table_name]}"

  entities = nil
  success =  storage_io_block {
    create_table_exist_recovery
    options = { :filter => filter }
    options[:continuation_token] = token if token
    entities = @client.tableClient.query_entities( @configuration[:state_table_name], options )
  }
  entities
end

#state_table_updateObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/logstash/outputs/application_insights/state_table.rb', line 51

def state_table_update
  if :uploading == @log_state
    state_table_delete
  elsif :committed == @log_state
    if state_table_insert && state_table_delete( :uploading )
      State.instance.dec_pending_commits
      State.instance.inc_pending_notifications
      # this is not a recovery, it is actually enqueue to notify
      @notification_recovery.enqueue( state_to_tuple )
    end
  elsif :notified == @log_state
    if (!@configuration[:save_notified_blobs_records] || state_table_insert) && state_table_delete( :committed ) 
      State.instance.dec_pending_notifications
    end
  end
end